def save_snapshots(etype: str, snapshots: list[dict]):
"""
Saves a list of snapshots of current master documents.
All snapshots must belong to same entity type.
Will move snapshots to oversized snapshots if the maintained bucket is too large.
For better understanding, see `save_snapshot()`.
"""
snapshots_by_eid = defaultdict(list)
for snapshot in snapshots:
if "eid" not in snapshot:
continue
if "_id" in snapshot:
del snapshot["_id"]
snapshots_by_eid[snapshot["eid"]].append(snapshot)
print(f"Saving {len(snapshots)} snapshots of {len(snapshots_by_eid)} entities of {etype}")
snapshot_col = f"{etype}#snapshots"
os_col = f"{etype}#snapshots_oversized"
# Find out if any of the snapshots are oversized
docs = list(
db[snapshot_col].find(
{"_id": {"$in": list(snapshots_by_eid.keys())}}, {"oversized": 1, "eid": 1}
)
)
updates = []
update_originals = []
oversized_inserts = []
oversized_updates = []
for doc in docs:
eid = doc["_id"]
if not doc.get("oversized", False):
# A normal snapshot, shift the last snapshot to history and update last
updates.append(
UpdateOne(
{"_id": eid},
[
{
"$set": {
"history": {
"$concatArrays": [
snapshots_by_eid[eid][:-1],
["$last"],
"$history",
]
},
"count": {"$sum": [len(snapshots_by_eid[eid]), "$count"]},
}
},
{"$set": {"last": {"$literal": snapshots_by_eid[eid][-1]}}},
],
)
)
update_originals.append(snapshots_by_eid[eid])
else:
# Snapshot is already marked as oversized
oversized_inserts.extend(snapshots_by_eid[eid])
oversized_updates.append(
UpdateOne({"_id": eid}, {"$set": {"last": snapshots_by_eid[eid][-1]}})
)
del snapshots_by_eid[eid]
# The remaining snapshots are new
inserts = [
{
"_id": eid,
"last": eid_snapshots[-1],
"history": eid_snapshots[:-1],
"oversized": False,
"count": len(eid_snapshots) - 1,
}
for eid, eid_snapshots in snapshots_by_eid.items()
]
if updates:
try:
res = db[snapshot_col].bulk_write(updates, ordered=False)
if res.modified_count != len(updates):
print(
f"Some snapshots were not updated, "
f"{res.modified_count} != {len(snapshots_by_eid)}"
)
except (BulkWriteError, OperationFailure) as e:
print("Update of snapshots failed, will retry with oversize.")
failed_indexes = [
err["index"]
for err in e.details["writeErrors"]
if err["code"] == BSON_OBJECT_TOO_LARGE
]
failed_snapshots = (update_originals[i] for i in failed_indexes)
for eid_snapshots in failed_snapshots:
eid = eid_snapshots[0]["eid"]
failed_snapshots = sorted(
eid_snapshots, key=lambda s: s["_time_created"], reverse=True
)
_migrate_to_oversized_snapshot(etype, eid, failed_snapshots[0])
oversized_inserts.extend(failed_snapshots[1:])
if any(err["code"] != BSON_OBJECT_TOO_LARGE for err in e.details["writeErrors"]):
# Some other error occurred
raise e
except Exception as e:
raise DatabaseError(f"Update of snapshots failed: {str(e)[:2048]}") from e
if inserts:
try:
# Insert new snapshots
res = db[snapshot_col].insert_many(inserts, ordered=False)
if len(res.inserted_ids) != len(snapshots_by_eid):
print(
f"Some snapshots were not inserted, "
f"{len(res.inserted_ids)} != {len(snapshots_by_eid)}"
)
except (DocumentTooLarge, OperationFailure) as e:
print(f"Snapshot too large: {e}")
checked_inserts = []
oversized_inserts = []
# Filter out the oversized snapshots
for insert_doc in inserts:
bsize = len(bson.BSON.encode(insert_doc))
if bsize < 16 * 1024 * 1024:
checked_inserts.append(insert_doc)
else:
eid = insert_doc["_id"]
checked_inserts.append(
{
"_id": eid,
"last": insert_doc["last"],
"oversized": True,
"history": [],
"count": 0,
}
)
oversized_inserts.extend(insert_doc["history"] + [insert_doc["last"]])
try:
db[snapshot_col].insert_many(checked_inserts, ordered=False)
except Exception as e:
raise DatabaseError(f"Insert of snapshots failed: {e}") from e
except Exception as e:
raise DatabaseError(f"Insert of snapshot failed: {e}") from e
# Update the oversized snapshots
if oversized_inserts:
try:
if oversized_updates:
db[snapshot_col].bulk_write(oversized_updates)
db[os_col].insert_many(oversized_inserts)
except Exception as e:
raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e