Skip to content

dp3.database.snapshots

TypedSnapshotCollection

TypedSnapshotCollection(db: Database, entity_type: str, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict)

Bases: ABC

Snapshot collection handler with eid type awareness.

Source code in dp3/database/snapshots.py
def __init__(
    self,
    db: Database,
    entity_type: str,
    db_config: MongoConfig,
    model_spec: ModelSpec,
    snapshots_config: dict,
):
    self._db = db.with_options(codec_options=get_codec_options())

    if entity_type not in model_spec.entities:
        raise ValueError(f"Entity type '{entity_type}' not found in model spec")
    self.entity_type = entity_type
    self._col_name = f"{entity_type}#snapshots"
    self._os_col_name = f"{entity_type}#oversized_snapshots"

    self.attr_specs: dict[str, AttrSpecType] = model_spec.entity_attributes[entity_type]

    self.log = logging.getLogger(f"EntityDatabase.SnapshotCollection[{entity_type}]")

    self._normal_snapshot_eids = set()
    self._oversized_snapshot_eids = set()
    self._snapshot_bucket_size = db_config.storage.snapshot_bucket_size
    self._bucket_delta = self._get_snapshot_bucket_delta(snapshots_config)

get_latest_one

get_latest_one(eid: AnyEidT) -> dict

Get latest snapshot of given eid.

If doesn't exist, returns {}.

Source code in dp3/database/snapshots.py
def get_latest_one(self, eid: AnyEidT) -> dict:
    """Get latest snapshot of given eid.

    If doesn't exist, returns {}.
    """
    return (
        self._col().find_one(self._filter_from_eid(eid), {"last": 1}, sort=[("_id", -1)]) or {}
    ).get("last", {})

get_latest

get_latest(fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> tuple[Cursor, int]

Get latest snapshots of given etype.

This method is useful for displaying data on web.

Returns only documents matching generic_filter and fulltext_filters (dictionary attribute - fulltext filter). Fulltext filters are interpreted as regular expressions. Only string values may be filtered this way. There's no validation that queried attribute can be fulltext filtered. Only plain and observation attributes with string-based data types can be queried. Array and set data types are supported as well as long as they are not multi value at the same time. If you need to filter EIDs, ensure the EID is string, then use attribute eid. Otherwise, use generic filter.

Generic filter allows filtering using generic MongoDB query (including $and, $or, $lt, etc.). For querying non-JSON-native types, you can use magic strings, such as "$$IPv4{<ip address>}" for IPv4 addresses. The full spec with examples is in the magic strings module.

Generic and fulltext filters are merged - fulltext overrides conflicting keys.

Also returns total document count (after filtering).

May raise SnapshotCollectionError if query is invalid.

Source code in dp3/database/snapshots.py
def get_latest(
    self,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> tuple[Cursor, int]:
    """Get latest snapshots of given `etype`.

    This method is useful for displaying data on web.

    Returns only documents matching `generic_filter` and `fulltext_filters`
    (dictionary attribute - fulltext filter).
    Fulltext filters are interpreted as regular expressions.
    Only string values may be filtered this way. There's no validation that queried attribute
    can be fulltext filtered.
    Only plain and observation attributes with string-based data types can be queried.
    Array and set data types are supported as well as long as they are not multi value
    at the same time.
    If you need to filter EIDs, ensure the EID is string, then use attribute `eid`.
    Otherwise, use generic filter.

    Generic filter allows filtering using generic MongoDB query (including `$and`, `$or`,
    `$lt`, etc.).
    For querying non-JSON-native types, you can use magic strings, such as
    `"$$IPv4{<ip address>}"` for IPv4 addresses. The full spec with examples is in the
    [magic strings module][dp3.database.magic].

    Generic and fulltext filters are merged - fulltext overrides conflicting keys.

    Also returns total document count (after filtering).

    May raise `SnapshotCollectionError` if query is invalid.
    """
    snapshot_col = self._col()
    query = self._prepare_latest_query(fulltext_filters or {}, generic_filter or {})

    try:
        return snapshot_col.find(query, {"last": 1}).sort(
            [("_id", pymongo.ASCENDING)]
        ), snapshot_col.count_documents(query)
    except OperationFailure as e:
        raise SnapshotCollectionError(f"Query is invalid: {e}") from e

find_latest

find_latest(fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> Cursor

Find latest snapshots of given etype.

See get_latest for more information.

Returns only documents matching generic_filter and fulltext_filters, does not count them.

Source code in dp3/database/snapshots.py
def find_latest(
    self,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> Cursor:
    """Find latest snapshots of given `etype`.

    See [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only documents matching `generic_filter` and `fulltext_filters`,
    does not count them.
    """
    query = self._prepare_latest_query(fulltext_filters or {}, generic_filter or {})
    try:
        return self._col().find(query, {"last": 1}).sort([("_id", pymongo.ASCENDING)])
    except OperationFailure as e:
        raise SnapshotCollectionError(f"Query is invalid: {e}") from e

count_latest

count_latest(fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> int

Count latest snapshots of given etype.

See get_latest for more information.

Returns only count of documents matching generic_filter and fulltext_filters.

Note that this method may take much longer than get_latest on larger databases, as it does count all documents, not just return the first few.

Source code in dp3/database/snapshots.py
def count_latest(
    self,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> int:
    """Count latest snapshots of given `etype`.

    See [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only count of documents matching `generic_filter` and `fulltext_filters`.

    Note that this method may take much longer than `get_latest` on larger databases,
    as it does count all documents, not just return the first few.
    """
    query = self._prepare_latest_query(fulltext_filters or {}, generic_filter or {})
    try:
        return self._col().count_documents(query)
    except OperationFailure as e:
        raise SnapshotCollectionError(f"Query is invalid: {e}") from e

get_by_eid

get_by_eid(eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> Union[Cursor, CommandCursor]

Get all (or filtered) snapshots of given eid.

This method is useful for displaying eid's history on web.

Parameters:

Name Type Description Default
eid AnyEidT

id of entity, to which data-points correspond

required
t1 Optional[datetime]

left value of time interval (inclusive)

None
t2 Optional[datetime]

right value of time interval (inclusive)

None
Source code in dp3/database/snapshots.py
def get_by_eid(
    self, eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None
) -> Union[Cursor, CommandCursor]:
    """Get all (or filtered) snapshots of given `eid`.

    This method is useful for displaying `eid`'s history on web.

    Args:
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
    """
    snapshot_col = self._col()

    # Find out if the snapshot is oversized
    doc = (
        snapshot_col.find(self._filter_from_eid(eid), {"oversized": 1})
        .sort([("_id", -1)])
        .limit(1)
    )
    doc = next(doc, None)
    if doc and doc.get("oversized", False):
        return self._get_oversized(eid, t1, t2)

    query = {"_time_created": {}}
    pipeline = [
        {"$match": self._filter_from_eid(eid)},
        {"$unwind": "$history"},
        {"$replaceRoot": {"newRoot": "$history"}},
    ]

    # Filter by date
    if t1:
        query["_time_created"]["$gte"] = t1
    if t2:
        query["_time_created"]["$lte"] = t2

    # Unset if empty
    if query["_time_created"]:
        pipeline.append({"$match": query})
    pipeline.append({"$sort": {"_time_created": pymongo.ASCENDING}})
    return snapshot_col.aggregate(pipeline)

get_distinct_val_count

get_distinct_val_count(attr: str) -> dict[Any, int]

Counts occurrences of distinct values of given attribute in snapshots.

Returns dictionary mapping value -> count.

Works for all plain and observation data types except dict and json.

Source code in dp3/database/snapshots.py
def get_distinct_val_count(self, attr: str) -> dict[Any, int]:
    """Counts occurrences of distinct values of given attribute in snapshots.

    Returns dictionary mapping value -> count.

    Works for all plain and observation data types except `dict` and `json`.
    """
    # Get attribute specification
    try:
        attr_spec = self.attr_specs[attr]
    except KeyError as e:
        raise SnapshotCollectionError(f"Attribute '{attr}' does not exist") from e

    if attr_spec.t not in AttrType.PLAIN | AttrType.OBSERVATIONS:
        raise SnapshotCollectionError(f"Attribute '{attr}' isn't plain or observations")

    # Attribute data type must be primitive, array<T> or set<T>
    if any(needle in attr_spec.data_type.root for needle in ("dict", "json")):
        raise SnapshotCollectionError(
            f"Data type '{attr_spec.data_type}' of attribute '{attr}' is not processable"
        )

    # Build aggregation query
    attr_path = "$last." + attr
    unwinding = []

    # Unwind array-like and multi value attributes
    # If attribute is multi value array, unwind twice
    if "array" in attr_spec.data_type.root or "set" in attr_spec.data_type.root:
        unwinding.append({"$unwind": attr_path})
    if attr_spec.t == AttrType.OBSERVATIONS and attr_spec.multi_value:
        unwinding.append({"$unwind": attr_path})

    # Group
    agg_query_group_id = attr_path
    if "link" in attr_spec.data_type.root:
        agg_query_group_id += ".eid"

    agg_query = [
        {"$match": {"latest": True}},
        *unwinding,
        {"$group": {"_id": agg_query_group_id, "count": {"$sum": 1}}},
        {"$sort": {"_id": 1, "count": -1}},
    ]
    # Run aggregation
    distinct_counts_cur = self._col().aggregate(agg_query)

    distinct_counts = {x["_id"]: x["count"] for x in distinct_counts_cur}

    if None in distinct_counts:
        del distinct_counts[None]

    return distinct_counts

save_one

save_one(snapshot: dict, ctime: datetime)

Saves snapshot to specified entity of current master document.

Will move snapshot to oversized snapshots if the maintained bucket is too large.

Source code in dp3/database/snapshots.py
def save_one(self, snapshot: dict, ctime: datetime):
    """Saves snapshot to specified entity of current master document.

    Will move snapshot to oversized snapshots if the maintained bucket is too large.
    """
    if "eid" not in snapshot:
        self.log.error("Snapshot is missing 'eid' field: %s", snapshot)
        return
    eid = snapshot["eid"]
    snapshot["_time_created"] = ctime

    snapshot_col = self._col()
    os_col = self._os_col()

    # Find out if the snapshot is oversized
    normal, oversized = self._get_state({eid})
    if normal:
        try:
            res = snapshot_col.update_one(
                self._filter_from_eid(eid) | {"count": {"$lt": self._snapshot_bucket_size}},
                {
                    "$set": {"last": snapshot},
                    "$push": {"history": {"$each": [snapshot], "$position": 0}},
                    "$inc": {"count": 1},
                    "$setOnInsert": {
                        "_id": self._bucket_id(eid, ctime),
                        "_time_created": ctime,
                        "oversized": False,
                        "latest": True,
                    },
                },
                upsert=True,
            )

            if res.upserted_id is not None:
                snapshot_col.update_many(
                    self._filter_from_eid(eid)
                    | {"latest": True, "count": self._snapshot_bucket_size},
                    {"$unset": {"latest": 1}},
                )
        except (WriteError, OperationFailure, DocumentTooLarge) as e:
            if e.code != BSON_OBJECT_TOO_LARGE:
                raise e
            # The snapshot is too large, move it to oversized snapshots
            self.log.info(f"Snapshot of {eid} is too large: {e}, marking as oversized.")
            self._migrate_to_oversized(eid, snapshot)
            self._cache_snapshot_state(set(), normal)
        except Exception as e:
            raise SnapshotCollectionError(
                f"Insert of snapshot {eid} failed: {e}, {snapshot}"
            ) from e
        return
    elif oversized:
        # Snapshot is already marked as oversized
        snapshot_col.update_one(self._filter_from_eid(eid), {"$set": {"last": snapshot}})
        os_col.insert_one(snapshot)
        return

save_many

save_many(snapshots: list[dict], ctime: datetime)

Saves a list of snapshots of current master documents.

All snapshots must belong to same entity type.

Will move snapshots to oversized snapshots if the maintained bucket is too large. For better understanding, see save().

Source code in dp3/database/snapshots.py
def save_many(self, snapshots: list[dict], ctime: datetime):
    """
    Saves a list of snapshots of current master documents.

    All snapshots must belong to same entity type.

    Will move snapshots to oversized snapshots if the maintained bucket is too large.
    For better understanding, see `save()`.
    """

    for snapshot in snapshots:
        snapshot["_time_created"] = ctime

    snapshot_col = self._col()
    os_col = self._os_col()

    snapshots_by_eid = defaultdict(list)
    for snapshot in snapshots:
        if "eid" not in snapshot:
            continue
        if "_id" in snapshot:
            del snapshot["_id"]
        snapshots_by_eid[snapshot["eid"]].append(snapshot)

    # Find out if any of the snapshots are oversized
    normal, oversized = self._get_state(set(snapshots_by_eid.keys()))

    upserts = []
    update_originals: list[list[dict]] = []
    oversized_inserts = []
    oversized_updates = []

    # A normal snapshot, shift the last snapshot to history and update last
    for eid in normal:
        upserts.append(
            UpdateOne(
                self._filter_from_eid(eid) | {"count": {"$lt": self._snapshot_bucket_size}},
                {
                    "$set": {"last": snapshots_by_eid[eid][-1]},
                    "$push": {"history": {"$each": snapshots_by_eid[eid], "$position": 0}},
                    "$inc": {"count": len(snapshots_by_eid[eid])},
                    "$setOnInsert": {
                        "_id": self._bucket_id(eid, ctime),
                        "_time_created": ctime,
                        "oversized": False,
                        "latest": True,
                    },
                },
                upsert=True,
            )
        )
        update_originals.append(snapshots_by_eid[eid])

    # Snapshot is already marked as oversized
    for eid in oversized:
        oversized_inserts.extend(snapshots_by_eid[eid])
        oversized_updates.append(
            UpdateOne(
                self._filter_from_eid(eid),
                {"$set": {"last": snapshots_by_eid[eid][-1]}},
            )
        )

    new_oversized = set()

    if upserts:
        try:
            res = snapshot_col.bulk_write(upserts, ordered=False)

            # Unset latest snapshots if new snapshots were inserted
            if res.upserted_count > 0:
                unset_latest_updates = []
                for upsert_id in res.upserted_ids.values():
                    unset_latest_updates.append(
                        UpdateMany(
                            self._filter_from_bid(upsert_id)
                            | {"latest": True, "count": self._snapshot_bucket_size},
                            {"$unset": {"latest": 1}},
                        )
                    )
                up_res = snapshot_col.bulk_write(unset_latest_updates)
                if up_res.modified_count != res.upserted_count:
                    self.log.info(
                        "Upserted the first snapshot for %d entities.",
                        res.upserted_count - up_res.modified_count,
                    )

            if res.modified_count + res.upserted_count != len(upserts):
                self.log.error(
                    "Some snapshots were not updated, %s != %s",
                    res.modified_count + res.upserted_count,
                    len(upserts),
                )
        except (BulkWriteError, OperationFailure) as e:
            self.log.info("Update of snapshots failed, will retry with oversize.")
            failed_indexes = [
                err["index"]
                for err in e.details["writeErrors"]
                if err["code"] == BSON_OBJECT_TOO_LARGE
            ]
            failed_snapshots = (update_originals[i] for i in failed_indexes)
            for eid_snapshots in failed_snapshots:
                eid = eid_snapshots[0]["eid"]
                failed_snapshots = sorted(
                    eid_snapshots, key=lambda s: s["_time_created"], reverse=True
                )
                self._migrate_to_oversized(eid, failed_snapshots[0])
                oversized_inserts.extend(failed_snapshots[1:])
                new_oversized.add(eid)

            if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]):
                # Some other error occurred
                raise e
        except Exception as e:
            raise SnapshotCollectionError(f"Upsert of snapshots failed: {str(e)[:2048]}") from e

    # Update the oversized snapshots
    if oversized_inserts:
        try:
            if oversized_updates:
                snapshot_col.bulk_write(oversized_updates)
            os_col.insert_many(oversized_inserts)
        except Exception as e:
            raise SnapshotCollectionError(f"Insert of snapshots failed: {str(e)[:2048]}") from e

    # Cache the new state
    self._cache_snapshot_state(set(), new_oversized)

delete_old

delete_old(t_old: datetime) -> int

Delete old snapshots.

Periodically called from HistoryManager.

Source code in dp3/database/snapshots.py
def delete_old(self, t_old: datetime) -> int:
    """Delete old snapshots.

    Periodically called from HistoryManager.
    """
    deleted = 0
    try:
        res = self._col().delete_many(
            {"_time_created": {"$lte": t_old - self._bucket_delta}},
        )
        deleted += res.deleted_count * self._snapshot_bucket_size
        res = self._os_col().delete_many({"_time_created": {"$lt": t_old}})
        deleted += res.deleted_count
    except Exception as e:
        raise SnapshotCollectionError(f"Delete of olds snapshots failed: {e}") from e
    return deleted

delete_eid

delete_eid(eid: AnyEidT)

Delete all snapshots of eid.

Source code in dp3/database/snapshots.py
def delete_eid(self, eid: AnyEidT):
    """Delete all snapshots of `eid`."""
    try:
        res = self._col().delete_many(self._filter_from_eid(eid))
        del_cnt = res.deleted_count * self._snapshot_bucket_size
        self.log.debug("deleted %s snapshots of %s/%s.", del_cnt, self.entity_type, eid)

        res = self._os_col().delete_many({"eid": eid})
        self.log.debug(
            "Deleted %s oversized snapshots of %s/%s.", res.deleted_count, self.entity_type, eid
        )
    except Exception as e:
        raise SnapshotCollectionError(f"Delete of failed: {e}\n{eid}") from e

delete_eids

delete_eids(eids: list[Any])

Delete all snapshots of eids.

Source code in dp3/database/snapshots.py
def delete_eids(self, eids: list[Any]):
    """Delete all snapshots of `eids`."""
    try:
        res = self._col().delete_many(self._filter_from_eids(eids))
        del_cnt = res.deleted_count * self._snapshot_bucket_size
        self.log.debug("Deleted %s snapshots of %s (%s).", del_cnt, self.entity_type, len(eids))
        res = self._os_col().delete_many({"eid": {"$in": eids}})
        self.log.debug(
            "Deleted %s oversized snapshots of %s (%s).",
            res.deleted_count,
            self.entity_type,
            len(eids),
        )
    except Exception as e:
        raise SnapshotCollectionError(f"Delete of snapshots failed: {e}\n{eids}") from e

BinaryEidSnapshots

BinaryEidSnapshots(db: Database, entity_type: str, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict)

Bases: TypedSnapshotCollection, ABC

Source code in dp3/database/snapshots.py
def __init__(
    self,
    db: Database,
    entity_type: str,
    db_config: MongoConfig,
    model_spec: ModelSpec,
    snapshots_config: dict,
):
    self._db = db.with_options(codec_options=get_codec_options())

    if entity_type not in model_spec.entities:
        raise ValueError(f"Entity type '{entity_type}' not found in model spec")
    self.entity_type = entity_type
    self._col_name = f"{entity_type}#snapshots"
    self._os_col_name = f"{entity_type}#oversized_snapshots"

    self.attr_specs: dict[str, AttrSpecType] = model_spec.entity_attributes[entity_type]

    self.log = logging.getLogger(f"EntityDatabase.SnapshotCollection[{entity_type}]")

    self._normal_snapshot_eids = set()
    self._oversized_snapshot_eids = set()
    self._snapshot_bucket_size = db_config.storage.snapshot_bucket_size
    self._bucket_delta = self._get_snapshot_bucket_delta(snapshots_config)

SnapshotCollectionContainer

SnapshotCollectionContainer(db: Database, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict)

Container for all required snapshot collections, exposing the public interface.

Source code in dp3/database/snapshots.py
def __init__(
    self, db: Database, db_config: MongoConfig, model_spec: ModelSpec, snapshots_config: dict
):
    self._snapshot_collections = {}
    for entity_type, entity_spec in model_spec.entities.items():
        eid_type_name = entity_spec.id_data_type.root
        typed_collection: type[TypedSnapshotCollection] = entity_type2collection[eid_type_name]
        self._snapshot_collections[entity_type] = typed_collection(
            db, entity_type, db_config, model_spec, snapshots_config
        )

    self.log = logging.getLogger("EntityDatabase.SnapshotCollections")

save_one

save_one(entity_type: str, snapshot: dict, ctime: datetime) -> None

Save snapshot to specified entity of current master document.

Source code in dp3/database/snapshots.py
def save_one(self, entity_type: str, snapshot: dict, ctime: datetime) -> None:
    """Save snapshot to specified entity of current master document."""
    return self[entity_type].save_one(snapshot, ctime)

save_many

save_many(entity_type: str, snapshots: list[dict], ctime: datetime) -> None

Saves a list of snapshots of current master documents.

All snapshots must belong to same entity type.

Source code in dp3/database/snapshots.py
def save_many(self, entity_type: str, snapshots: list[dict], ctime: datetime) -> None:
    """
    Saves a list of snapshots of current master documents.

    All snapshots must belong to same entity type.
    """
    return self[entity_type].save_many(snapshots, ctime)

get_latest_one

get_latest_one(entity_type: str, eid: AnyEidT) -> dict

Get latest snapshot of given eid.

If doesn't exist, returns {}.

Source code in dp3/database/snapshots.py
def get_latest_one(self, entity_type: str, eid: AnyEidT) -> dict:
    """Get latest snapshot of given eid.

    If doesn't exist, returns {}.
    """
    return self[entity_type].get_latest_one(eid)

get_latest

get_latest(entity_type: str, fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> tuple[Cursor, int]

Get latest snapshots of given etype.

This method is useful for displaying data on web.

Returns only documents matching generic_filter and fulltext_filters (dictionary attribute - fulltext filter). Fulltext filters are interpreted as regular expressions. Only string values may be filtered this way. There's no validation that queried attribute can be fulltext filtered. Only plain and observation attributes with string-based data types can be queried. Array and set data types are supported as well as long as they are not multi value at the same time. If you need to filter EIDs, ensure the EID is string, then use attribute eid. Otherwise, use generic filter.

Generic filter allows filtering using generic MongoDB query (including $and, $or, $lt, etc.). For querying non-JSON-native types, you can use magic strings, such as "$$IPv4{<ip address>}" for IPv4 addresses. The full spec with examples is in the magic strings module.

Generic and fulltext filters are merged - fulltext overrides conflicting keys.

Also returns total document count (after filtering).

May raise SnapshotCollectionError if query is invalid.

Source code in dp3/database/snapshots.py
def get_latest(
    self,
    entity_type: str,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> tuple[Cursor, int]:
    """Get latest snapshots of given `etype`.

    This method is useful for displaying data on web.

    Returns only documents matching `generic_filter` and `fulltext_filters`
    (dictionary attribute - fulltext filter).
    Fulltext filters are interpreted as regular expressions.
    Only string values may be filtered this way. There's no validation that queried attribute
    can be fulltext filtered.
    Only plain and observation attributes with string-based data types can be queried.
    Array and set data types are supported as well as long as they are not multi value
    at the same time.
    If you need to filter EIDs, ensure the EID is string, then use attribute `eid`.
    Otherwise, use generic filter.

    Generic filter allows filtering using generic MongoDB query (including `$and`, `$or`,
    `$lt`, etc.).
    For querying non-JSON-native types, you can use magic strings, such as
    `"$$IPv4{<ip address>}"` for IPv4 addresses. The full spec with examples is in the
    [magic strings module][dp3.database.magic].

    Generic and fulltext filters are merged - fulltext overrides conflicting keys.

    Also returns total document count (after filtering).

    May raise `SnapshotCollectionError` if query is invalid.
    """
    return self[entity_type].get_latest(fulltext_filters, generic_filter)

find_latest

find_latest(entity_type: str, fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> Cursor

Find latest snapshots of given etype.

see get_latest for more information.

Returns only documents matching generic_filter and fulltext_filters, does not count them.

Source code in dp3/database/snapshots.py
def find_latest(
    self,
    entity_type: str,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> Cursor:
    """Find latest snapshots of given `etype`.

    see [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only documents matching `generic_filter` and `fulltext_filters`,
    does not count them.
    """
    return self[entity_type].find_latest(fulltext_filters, generic_filter)

count_latest

count_latest(entity_type: str, fulltext_filters: Optional[dict[str, str]] = None, generic_filter: Optional[dict[str, Any]] = None) -> int

Count latest snapshots of given etype.

see get_latest for more information.

Returns only count of documents matching generic_filter and fulltext_filters.

Note that this method may take much longer than get_latest on larger databases, as it does count all documents, not just return the first few.

Source code in dp3/database/snapshots.py
def count_latest(
    self,
    entity_type: str,
    fulltext_filters: Optional[dict[str, str]] = None,
    generic_filter: Optional[dict[str, Any]] = None,
) -> int:
    """Count latest snapshots of given `etype`.

    see [`get_latest`][dp3.database.snapshots.SnapshotCollectionContainer.get_latest]
    for more information.

    Returns only count of documents matching `generic_filter` and `fulltext_filters`.

    Note that this method may take much longer than `get_latest` on larger databases,
    as it does count all documents, not just return the first few.
    """
    return self[entity_type].count_latest(fulltext_filters, generic_filter)

get_by_eid

get_by_eid(entity_type: str, eid: AnyEidT, t1: Optional[datetime] = None, t2: Optional[datetime] = None) -> Union[Cursor, CommandCursor]

Get all (or filtered) snapshots of given eid.

This method is useful for displaying eid's history on web.

Parameters:

Name Type Description Default
entity_type str

name of entity type

required
eid AnyEidT

id of entity, to which data-points correspond

required
t1 Optional[datetime]

left value of time interval (inclusive)

None
t2 Optional[datetime]

right value of time interval (inclusive)

None
Source code in dp3/database/snapshots.py
def get_by_eid(
    self,
    entity_type: str,
    eid: AnyEidT,
    t1: Optional[datetime] = None,
    t2: Optional[datetime] = None,
) -> Union[Cursor, CommandCursor]:
    """Get all (or filtered) snapshots of given `eid`.

    This method is useful for displaying `eid`'s history on web.

    Args:
        entity_type: name of entity type
        eid: id of entity, to which data-points correspond
        t1: left value of time interval (inclusive)
        t2: right value of time interval (inclusive)
    """
    return self[entity_type].get_by_eid(eid, t1, t2)

get_distinct_val_count

get_distinct_val_count(entity_type: str, attr: str) -> dict[Any, int]

Counts occurrences of distinct values of given attribute in snapshots.

Returns dictionary mapping value -> count.

Works for all plain and observation data types except dict and json.

Source code in dp3/database/snapshots.py
def get_distinct_val_count(self, entity_type: str, attr: str) -> dict[Any, int]:
    """Counts occurrences of distinct values of given attribute in snapshots.

    Returns dictionary mapping value -> count.

    Works for all plain and observation data types except `dict` and `json`.
    """
    return self[entity_type].get_distinct_val_count(attr)

delete_old

delete_old(t_old: datetime) -> int

Delete old snapshots, may raise SnapshotCollectionError.

Periodically called from HistoryManager. Returns: number of deleted snapshots.

Source code in dp3/database/snapshots.py
def delete_old(self, t_old: datetime) -> int:
    """Delete old snapshots, may raise `SnapshotCollectionError`.

    Periodically called from HistoryManager.
    Returns:
         number of deleted snapshots.
    """
    deleted_total = 0
    for collection in self._snapshot_collections.values():
        try:
            deleted_total += collection.delete_old(t_old)
        except Exception as e:
            raise SnapshotCollectionError(f"Delete of old snapshots failed: {e}") from e
    return deleted_total

delete_eid

delete_eid(entity_type: str, eid: AnyEidT) -> int

Delete snapshots of given eids.

Source code in dp3/database/snapshots.py
def delete_eid(self, entity_type: str, eid: AnyEidT) -> int:
    """Delete snapshots of given `eids`."""
    return self[entity_type].delete_eid(eid)

delete_eids

delete_eids(entity_type: str, eids: list[Any]) -> int

Delete snapshots of given eids.

Source code in dp3/database/snapshots.py
def delete_eids(self, entity_type: str, eids: list[Any]) -> int:
    """Delete snapshots of given `eids`."""
    return self[entity_type].delete_eids(eids)