Run cache_joined_hosts_for_event in background (#9951)

This commit is contained in:
Erik Johnston 2021-05-12 13:17:11 +01:00 committed by GitHub
parent 63fb220e5f
commit affaffb0ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 5 deletions

1
changelog.d/9951.feature Normal file
View File

@ -0,0 +1 @@
Improve performance of sending events for worker-based deployments using Redis.

View File

@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
@ -43,14 +44,14 @@ from synapse.events import EventBase
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder
from synapse.util.async_helpers import Linearizer
from synapse.util import json_decoder, json_encoder, log_failure
from synapse.util.async_helpers import Linearizer, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@ -979,9 +980,43 @@ class EventCreationHandler:
logger.exception("Failed to encode content: %r", event.content)
raise
await self.action_generator.handle_push_actions_for_event(event, context)
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
result = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._persist_event,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
),
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
],
consumeErrors=True,
)
).addErrback(unwrapFirstError)
await self.cache_joined_hosts_for_event(event, context)
return result[0]
async def _persist_event(
self,
requester: Requester,
event: EventBase,
context: EventContext,
ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None,
) -> EventBase:
"""Actually persists the event. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of
the arguments.
"""
await self.action_generator.handle_push_actions_for_event(event, context)
try:
# If we're a worker we need to hit out to the master.