diff --git a/changelog.d/9951.feature b/changelog.d/9951.feature new file mode 100644 index 0000000000..96a0e7f09f --- /dev/null +++ b/changelog.d/9951.feature @@ -0,0 +1 @@ +Improve performance of sending events for worker-based deployments using Redis. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5afb7fc261..9f365eb5ad 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json +from twisted.internet import defer from twisted.internet.interfaces import IDelayedCall from synapse import event_auth @@ -43,14 +44,14 @@ from synapse.events import EventBase from synapse.events.builder import EventBuilder from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator -from synapse.logging.context import run_in_background +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester -from synapse.util import json_decoder, json_encoder -from synapse.util.async_helpers import Linearizer +from synapse.util import json_decoder, json_encoder, log_failure +from synapse.util.async_helpers import Linearizer, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -979,9 +980,43 @@ class EventCreationHandler: logger.exception("Failed to encode content: %r", event.content) raise - await self.action_generator.handle_push_actions_for_event(event, context) + # We now persist the event (and update the cache in parallel, since we + # don't want to block on it). + result = await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self._persist_event, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ), + run_in_background( + self.cache_joined_hosts_for_event, event, context + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), + ], + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) - await self.cache_joined_hosts_for_event(event, context) + return result[0] + + async def _persist_event( + self, + requester: Requester, + event: EventBase, + context: EventContext, + ratelimit: bool = True, + extra_users: Optional[List[UserID]] = None, + ) -> EventBase: + """Actually persists the event. Should only be called by + `handle_new_client_event`, and see its docstring for documentation of + the arguments. + """ + + await self.action_generator.handle_push_actions_for_event(event, context) try: # If we're a worker we need to hit out to the master.