Clear event caches when we purge history (#15609)
This should help a little with #13476 --------- Co-authored-by: Patrick Cloke <patrickc@matrix.org>
This commit is contained in:
parent
d162aecaac
commit
c485ed1c5a
|
@ -0,0 +1 @@
|
|||
Correctly clear caches when we delete a room.
|
|
@ -86,9 +86,14 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||
room_id: Room where state changed
|
||||
members_changed: The user_ids of members that have changed
|
||||
"""
|
||||
|
||||
# XXX: If you add something to this function make sure you add it to
|
||||
# `_invalidate_state_caches_all` as well.
|
||||
|
||||
# If there were any membership changes, purge the appropriate caches.
|
||||
for host in {get_domain_from_id(u) for u in members_changed}:
|
||||
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
|
||||
self._attempt_to_invalidate_cache("is_host_invited", (room_id, host))
|
||||
if members_changed:
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
|
||||
|
@ -117,6 +122,32 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
|
||||
def _invalidate_state_caches_all(self, room_id: str) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
not stream invalidations down replication.
|
||||
|
||||
Same as `_invalidate_state_caches`, except that works when we don't know
|
||||
which memberships have changed.
|
||||
|
||||
Args:
|
||||
room_id: Room where state changed
|
||||
"""
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("is_host_invited", None)
|
||||
self._attempt_to_invalidate_cache("is_host_joined", None)
|
||||
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
||||
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user_with_stream_ordering", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
) -> bool:
|
||||
|
|
|
@ -46,6 +46,12 @@ logger = logging.getLogger(__name__)
|
|||
# based on the current state when notifying workers over replication.
|
||||
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
|
||||
|
||||
# As above, but for invalidating event caches on history deletion
|
||||
PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
|
||||
|
||||
# As above, but for invalidating room caches on room deletion
|
||||
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
|
||||
|
||||
|
||||
class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
def __init__(
|
||||
|
@ -175,6 +181,23 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
room_id = row.keys[0]
|
||||
members_changed = set(row.keys[1:])
|
||||
self._invalidate_state_caches(room_id, members_changed)
|
||||
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
|
||||
if row.keys is None:
|
||||
raise Exception(
|
||||
"Can't send an 'invalidate all' for 'purge history' cache"
|
||||
)
|
||||
|
||||
room_id = row.keys[0]
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
elif row.cache_func == DELETE_ROOM_CACHE_NAME:
|
||||
if row.keys is None:
|
||||
raise Exception(
|
||||
"Can't send an 'invalidate all' for 'delete room' cache"
|
||||
)
|
||||
|
||||
room_id = row.keys[0]
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
self._invalidate_caches_for_room(room_id)
|
||||
else:
|
||||
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
|
||||
|
||||
|
@ -226,6 +249,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
relates_to: Optional[str],
|
||||
backfilled: bool,
|
||||
) -> None:
|
||||
# XXX: If you add something to this function make sure you add it to
|
||||
# `_invalidate_caches_for_room_events` as well.
|
||||
|
||||
# This invalidates any local in-memory cached event objects, the original
|
||||
# process triggering the invalidation is responsible for clearing any external
|
||||
# cached objects.
|
||||
|
@ -271,6 +297,106 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
|
||||
def _invalidate_caches_for_room_events_and_stream(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
"""Invalidate caches associated with events in a room, and stream to
|
||||
replication.
|
||||
|
||||
Used when we delete events a room, but don't know which events we've
|
||||
deleted.
|
||||
"""
|
||||
|
||||
self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id])
|
||||
txn.call_after(self._invalidate_caches_for_room_events, room_id)
|
||||
|
||||
def _invalidate_caches_for_room_events(self, room_id: str) -> None:
|
||||
"""Invalidate caches associated with events in a room, and stream to
|
||||
replication.
|
||||
|
||||
Used when we delete events in a room, but don't know which events we've
|
||||
deleted.
|
||||
"""
|
||||
|
||||
self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
|
||||
|
||||
self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
self._attempt_to_invalidate_cache("get_relations_for_event", None)
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_id", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user_with_stream_ordering", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_summary", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_participated", None)
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
|
||||
|
||||
self._attempt_to_invalidate_cache("get_event_ordering", None)
|
||||
self._attempt_to_invalidate_cache("is_partial_state_event", None)
|
||||
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
|
||||
|
||||
def _invalidate_caches_for_room_and_stream(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
"""Invalidate caches associated with rooms, and stream to replication.
|
||||
|
||||
Used when we delete rooms.
|
||||
"""
|
||||
|
||||
self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
|
||||
txn.call_after(self._invalidate_caches_for_room, room_id)
|
||||
|
||||
def _invalidate_caches_for_room(self, room_id: str) -> None:
|
||||
"""Invalidate caches associated with rooms.
|
||||
|
||||
Used when we delete rooms.
|
||||
"""
|
||||
|
||||
# If we've deleted the room then we also need to purge all event caches.
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
|
||||
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
|
||||
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
|
||||
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_linearized_receipts_for_room", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_partial_state_servers_at_join", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_current_hosts_in_room_ordered", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
|
||||
|
||||
# And delete state caches.
|
||||
|
||||
self._invalidate_state_caches_all(room_id)
|
||||
|
||||
async def invalidate_cache_and_stream(
|
||||
self, cache_name: str, keys: Tuple[Any, ...]
|
||||
) -> None:
|
||||
|
@ -377,6 +503,14 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
"Can't stream invalidate all with magic current state cache"
|
||||
)
|
||||
|
||||
if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None:
|
||||
raise Exception(
|
||||
"Can't stream invalidate all with magic purge history cache"
|
||||
)
|
||||
|
||||
if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
|
||||
raise Exception("Can't stream invalidate all with magic delete room cache")
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
assert self._cache_id_gen is not None
|
||||
|
||||
|
|
|
@ -903,6 +903,15 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
self._event_ref.pop(event_id, None)
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
|
||||
def _invalidate_local_get_event_cache_all(self) -> None:
|
||||
"""Clears the in-memory get event caches.
|
||||
|
||||
Used when we purge room history.
|
||||
"""
|
||||
self._get_event_cache.clear()
|
||||
self._event_ref.clear()
|
||||
self._current_event_fetches.clear()
|
||||
|
||||
async def _get_events_from_cache(
|
||||
self, events: Iterable[str], update_metrics: bool = True
|
||||
) -> Dict[str, EventCacheEntry]:
|
||||
|
|
|
@ -308,6 +308,8 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
|
||||
logger.info("[purge] done")
|
||||
|
||||
self._invalidate_caches_for_room_events_and_stream(txn, room_id)
|
||||
|
||||
return referenced_state_groups
|
||||
|
||||
async def purge_room(self, room_id: str) -> List[int]:
|
||||
|
@ -485,10 +487,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
# index on them. In any case we should be clearing out 'stream' tables
|
||||
# periodically anyway (#5888)
|
||||
|
||||
# TODO: we could probably usefully do a bunch more cache invalidation here
|
||||
|
||||
# XXX: as with purge_history, this is racy, but no worse than other races
|
||||
# that already exist.
|
||||
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
||||
self._invalidate_caches_for_room_and_stream(txn, room_id)
|
||||
|
||||
return state_groups
|
||||
|
|
|
@ -862,5 +862,5 @@ class AsyncLruCache(Generic[KT, VT]):
|
|||
async def contains(self, key: KT) -> bool:
|
||||
return self._lru_cache.contains(key)
|
||||
|
||||
async def clear(self) -> None:
|
||||
def clear(self) -> None:
|
||||
self._lru_cache.clear()
|
||||
|
|
|
@ -163,7 +163,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
|||
# Blow away caches (supported room versions can only change due to a restart).
|
||||
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
|
||||
self.store.get_rooms_for_user.invalidate_all()
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
self.store._get_event_cache.clear()
|
||||
self.store._event_ref.clear()
|
||||
|
||||
# The rooms should be excluded from the sync response.
|
||||
|
|
|
@ -131,9 +131,6 @@ class ReadMarkerTestCase(unittest.HomeserverTestCase):
|
|||
event = self.get_success(self.store.get_event(event_id_1, allow_none=True))
|
||||
assert event is None
|
||||
|
||||
# TODO See https://github.com/matrix-org/synapse/issues/13476
|
||||
self.store.get_event_ordering.invalidate_all()
|
||||
|
||||
# Test moving the read marker to a newer event
|
||||
event_id_2 = send_message()
|
||||
channel = self.make_request(
|
||||
|
|
|
@ -188,7 +188,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
|
|||
self.event_id = res["event_id"]
|
||||
|
||||
# Reset the event cache so the tests start with it empty
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
self.store._get_event_cache.clear()
|
||||
|
||||
def test_simple(self) -> None:
|
||||
"""Test that we cache events that we pull from the DB."""
|
||||
|
@ -205,7 +205,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
|
|||
"""
|
||||
|
||||
# Reset the event cache
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
self.store._get_event_cache.clear()
|
||||
|
||||
with LoggingContext("test") as ctx:
|
||||
# We keep hold of the event event though we never use it.
|
||||
|
@ -215,7 +215,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
|
|||
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
|
||||
|
||||
# Reset the event cache
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
self.store._get_event_cache.clear()
|
||||
|
||||
with LoggingContext("test") as ctx:
|
||||
self.get_success(self.store.get_event(self.event_id))
|
||||
|
@ -390,7 +390,7 @@ class GetEventCancellationTestCase(unittest.HomeserverTestCase):
|
|||
self.event_id = res["event_id"]
|
||||
|
||||
# Reset the event cache so the tests start with it empty
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
self.store._get_event_cache.clear()
|
||||
|
||||
@contextmanager
|
||||
def blocking_get_event_calls(
|
||||
|
|
Loading…
Reference in New Issue