Prefill events after invalidate not before when persisting events (#15758)
Fixes #15757
This commit is contained in:
parent
8ddb2de553
commit
21fea6b749
|
@ -0,0 +1 @@
|
||||||
|
Avoid invalidating a cache that was just prefilled.
|
|
@ -1729,13 +1729,22 @@ class PersistEventsStore:
|
||||||
if not row["rejects"] and not row["redacts"]:
|
if not row["rejects"] and not row["redacts"]:
|
||||||
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
|
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
|
||||||
|
|
||||||
async def prefill() -> None:
|
async def external_prefill() -> None:
|
||||||
for cache_entry in to_prefill:
|
for cache_entry in to_prefill:
|
||||||
await self.store._get_event_cache.set(
|
await self.store._get_event_cache.set_external(
|
||||||
(cache_entry.event.event_id,), cache_entry
|
(cache_entry.event.event_id,), cache_entry
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.async_call_after(prefill)
|
def local_prefill() -> None:
|
||||||
|
for cache_entry in to_prefill:
|
||||||
|
self.store._get_event_cache.set_local(
|
||||||
|
(cache_entry.event.event_id,), cache_entry
|
||||||
|
)
|
||||||
|
|
||||||
|
# The order these are called here is not as important as knowing that after the
|
||||||
|
# transaction is finished, the async_call_after will run before the call_after.
|
||||||
|
txn.async_call_after(external_prefill)
|
||||||
|
txn.call_after(local_prefill)
|
||||||
|
|
||||||
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
|
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
|
||||||
assert event.redacts is not None
|
assert event.redacts is not None
|
||||||
|
|
|
@ -883,7 +883,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
|
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
Invalidates an event in the asyncronous get event cache, which may be remote.
|
Invalidates an event in the asynchronous get event cache, which may be remote.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
event_id: the event ID to invalidate
|
event_id: the event ID to invalidate
|
||||||
|
|
|
@ -842,7 +842,13 @@ class AsyncLruCache(Generic[KT, VT]):
|
||||||
return self._lru_cache.get(key, update_metrics=update_metrics)
|
return self._lru_cache.get(key, update_metrics=update_metrics)
|
||||||
|
|
||||||
async def set(self, key: KT, value: VT) -> None:
|
async def set(self, key: KT, value: VT) -> None:
|
||||||
self._lru_cache.set(key, value)
|
# This will add the entries in the correct order, local first external second
|
||||||
|
self.set_local(key, value)
|
||||||
|
await self.set_external(key, value)
|
||||||
|
|
||||||
|
async def set_external(self, key: KT, value: VT) -> None:
|
||||||
|
# This method should add an entry to any configured external cache, in this case noop.
|
||||||
|
pass
|
||||||
|
|
||||||
def set_local(self, key: KT, value: VT) -> None:
|
def set_local(self, key: KT, value: VT) -> None:
|
||||||
self._lru_cache.set(key, value)
|
self._lru_cache.set(key, value)
|
||||||
|
|
|
@ -139,6 +139,55 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||||
# That should result in a single db query to lookup
|
# That should result in a single db query to lookup
|
||||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
|
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
|
||||||
|
|
||||||
|
def test_persisting_event_prefills_get_event_cache(self) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure that the `_get_event_cache` is prefilled after we persist an
|
||||||
|
event and returns the updated value.
|
||||||
|
"""
|
||||||
|
event, event_context = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
room_id=self.room_id,
|
||||||
|
sender=self.user,
|
||||||
|
type="test_event_type",
|
||||||
|
content={"body": "conflabulation"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# First, check `_get_event_cache` for the event we just made
|
||||||
|
# to verify it's not in the cache.
|
||||||
|
res = self.store._get_event_cache.get_local((event.event_id,))
|
||||||
|
self.assertEqual(res, None, "Event was cached when it should not have been.")
|
||||||
|
|
||||||
|
with LoggingContext(name="test") as ctx:
|
||||||
|
# Persist the event which should invalidate then prefill the
|
||||||
|
# `_get_event_cache` so we don't return stale values.
|
||||||
|
# Side Note: Apparently, persisting an event isn't a transaction in the
|
||||||
|
# sense that it is recorded in the LoggingContext
|
||||||
|
persistence = self.hs.get_storage_controllers().persistence
|
||||||
|
assert persistence is not None
|
||||||
|
self.get_success(
|
||||||
|
persistence.persist_event(
|
||||||
|
event,
|
||||||
|
event_context,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check `_get_event_cache` again and we should see the updated fact
|
||||||
|
# that we now have the event cached after persisting it.
|
||||||
|
res = self.store._get_event_cache.get_local((event.event_id,))
|
||||||
|
self.assertEqual(res.event, event, "Event not cached as expected.") # type: ignore
|
||||||
|
|
||||||
|
# Try and fetch the event from the database.
|
||||||
|
self.get_success(self.store.get_event(event.event_id))
|
||||||
|
|
||||||
|
# Verify that the database hit was avoided.
|
||||||
|
self.assertEqual(
|
||||||
|
ctx.get_resource_usage().evt_db_fetch_count,
|
||||||
|
0,
|
||||||
|
"Database was hit, which would not happen if event was cached.",
|
||||||
|
)
|
||||||
|
|
||||||
def test_invalidate_cache_by_room_id(self) -> None:
|
def test_invalidate_cache_by_room_id(self) -> None:
|
||||||
"""
|
"""
|
||||||
Test to make sure that all events associated with the given `(room_id,)`
|
Test to make sure that all events associated with the given `(room_id,)`
|
||||||
|
|
Loading…
Reference in New Issue