Add storage function to purge history for a room
This commit is contained in:
parent
3de8168343
commit
a67bf0b074
|
@ -1281,6 +1281,146 @@ class EventsStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
|
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
|
||||||
|
|
||||||
|
def _delete_old_state_txn(self, txn, room_id, topological_ordering):
|
||||||
|
"""Deletes old room state
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Tables that should be pruned:
|
||||||
|
# event_auth
|
||||||
|
# event_backward_extremities
|
||||||
|
# event_content_hashes
|
||||||
|
# event_destinations
|
||||||
|
# event_edge_hashes
|
||||||
|
# event_edges
|
||||||
|
# event_forward_extremities
|
||||||
|
# event_json
|
||||||
|
# event_push_actions
|
||||||
|
# event_reference_hashes
|
||||||
|
# event_search
|
||||||
|
# event_signatures
|
||||||
|
# event_to_state_groups
|
||||||
|
# events
|
||||||
|
# rejections
|
||||||
|
# room_depth
|
||||||
|
# state_groups
|
||||||
|
# state_groups_state
|
||||||
|
|
||||||
|
# First ensure that we're not about to delete all the forward extremeties
|
||||||
|
txn.execute(
|
||||||
|
"SELECT e.event_id, e.depth FROM events as e "
|
||||||
|
"INNER JOIN event_forward_extremities as f "
|
||||||
|
"ON e.event_id = f.event_id "
|
||||||
|
"AND e.room_id = f.room_id "
|
||||||
|
"WHERE f.room_id = ?",
|
||||||
|
(room_id,)
|
||||||
|
)
|
||||||
|
rows = txn.fetchall()
|
||||||
|
max_depth = max(row[0] for row in rows)
|
||||||
|
|
||||||
|
if max_depth <= topological_ordering:
|
||||||
|
raise Exception("topological_ordering is greater than forward extremeties")
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
"SELECT event_id, state_key FROM events"
|
||||||
|
" LEFT JOIN state_events USING (room_id, event_id)"
|
||||||
|
" WHERE room_id = ? AND topological_ordering < ?",
|
||||||
|
(room_id, topological_ordering,)
|
||||||
|
)
|
||||||
|
event_rows = txn.fetchall()
|
||||||
|
|
||||||
|
# We calculate the new entries for the backward extremeties by finding
|
||||||
|
# all events that point to events that are to be purged
|
||||||
|
txn.execute(
|
||||||
|
"SELECT e.event_id FROM events as e"
|
||||||
|
" INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
|
||||||
|
" INNER JOIN events as e2 ON e2.event_id = ed.event_id"
|
||||||
|
" WHERE e.room_id = ? AND e.topological_ordering < ?"
|
||||||
|
" AND e2.topological_ordering >= ?",
|
||||||
|
(room_id, topological_ordering, topological_ordering)
|
||||||
|
)
|
||||||
|
new_backwards_extrems = txn.fetchall()
|
||||||
|
|
||||||
|
# Get all state groups that are only referenced by events that are
|
||||||
|
# to be deleted.
|
||||||
|
txn.execute(
|
||||||
|
"SELECT state_group FROM event_to_state_groups"
|
||||||
|
" INNER JOIN events USING (event_id)"
|
||||||
|
" WHERE state_group IN ("
|
||||||
|
" SELECT DISTINCT state_group FROM events"
|
||||||
|
" INNER JOIN event_to_state_groups USING (event_id)"
|
||||||
|
" WHERE room_id = ? AND topological_ordering < ?"
|
||||||
|
" )"
|
||||||
|
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
|
||||||
|
(room_id, topological_ordering, topological_ordering)
|
||||||
|
)
|
||||||
|
state_rows = txn.fetchall()
|
||||||
|
txn.executemany(
|
||||||
|
"DELETE FROM state_groups_state WHERE state_group = ?",
|
||||||
|
state_rows
|
||||||
|
)
|
||||||
|
txn.executemany(
|
||||||
|
"DELETE FROM state_groups WHERE id = ?",
|
||||||
|
state_rows
|
||||||
|
)
|
||||||
|
# Delete all non-state
|
||||||
|
txn.executemany(
|
||||||
|
"DELETE FROM event_to_state_groups WHERE event_id = ?",
|
||||||
|
[(event_id,) for event_id, _ in event_rows]
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
|
||||||
|
(topological_ordering, room_id,)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete all remote non-state events
|
||||||
|
to_delete = [
|
||||||
|
(event_id,) for event_id, state_key in event_rows
|
||||||
|
if state_key is None and not self.hs.is_mine_id(event_id)
|
||||||
|
]
|
||||||
|
to_not_delete = [
|
||||||
|
(event_id,) for event_id, state_key in event_rows
|
||||||
|
if state_key is not None or self.hs.is_mine_id(event_id)
|
||||||
|
]
|
||||||
|
for table in (
|
||||||
|
"events",
|
||||||
|
"event_json",
|
||||||
|
"event_auth",
|
||||||
|
"event_content_hashes",
|
||||||
|
"event_destinations",
|
||||||
|
"event_edge_hashes",
|
||||||
|
"event_edges",
|
||||||
|
"event_forward_extremities",
|
||||||
|
"event_push_actions",
|
||||||
|
"event_reference_hashes",
|
||||||
|
"event_search",
|
||||||
|
"event_signatures",
|
||||||
|
"rejections",
|
||||||
|
"event_backward_extremities",
|
||||||
|
):
|
||||||
|
txn.executemany(
|
||||||
|
"DELETE FROM %s WHERE event_id = ?" % (table,),
|
||||||
|
to_delete
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update backward extremeties
|
||||||
|
txn.executemany(
|
||||||
|
"INSERT INTO event_backward_extremities (room_id, event_id)"
|
||||||
|
" VALUES (?, ?)",
|
||||||
|
[(room_id, event_id) for event_id, in new_backwards_extrems]
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.executemany(
|
||||||
|
"DELETE FROM events WHERE event_id = ?",
|
||||||
|
to_delete
|
||||||
|
)
|
||||||
|
# Mark all state and own events as outliers
|
||||||
|
txn.executemany(
|
||||||
|
"UPDATE events SET outlier = ?"
|
||||||
|
" WHERE event_id = ?",
|
||||||
|
to_not_delete
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
AllNewEventsResult = namedtuple("AllNewEventsResult", [
|
AllNewEventsResult = namedtuple("AllNewEventsResult", [
|
||||||
"new_forward_events", "new_backfill_events",
|
"new_forward_events", "new_backfill_events",
|
||||||
|
|
Loading…
Reference in New Issue