mirror of
https://github.com/matrix-org/synapse.git
synced 2025-02-15 11:45:48 +00:00
Merge branch 'develop' into fix/prajjawal-9443
This commit is contained in:
commit
7bacbc499f
1
changelog.d/16149.misc
Normal file
1
changelog.d/16149.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Increase performance of read/write locks.
|
1
changelog.d/16159.misc
Normal file
1
changelog.d/16159.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Reduce scope of locks when paginating to alleviate DB contention.
|
@ -60,6 +60,7 @@ from synapse.events import EventBase
|
|||||||
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
|
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
from synapse.federation.federation_client import InvalidResponseError
|
from synapse.federation.federation_client import InvalidResponseError
|
||||||
|
from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
|
||||||
from synapse.http.servlet import assert_params_in_dict
|
from synapse.http.servlet import assert_params_in_dict
|
||||||
from synapse.logging.context import nested_logging_context
|
from synapse.logging.context import nested_logging_context
|
||||||
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
|
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
|
||||||
@ -152,6 +153,7 @@ class FederationHandler:
|
|||||||
self._device_handler = hs.get_device_handler()
|
self._device_handler = hs.get_device_handler()
|
||||||
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
||||||
self._notifier = hs.get_notifier()
|
self._notifier = hs.get_notifier()
|
||||||
|
self._worker_locks = hs.get_worker_locks_handler()
|
||||||
|
|
||||||
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
|
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
|
||||||
hs
|
hs
|
||||||
@ -200,7 +202,7 @@ class FederationHandler:
|
|||||||
@trace
|
@trace
|
||||||
@tag_args
|
@tag_args
|
||||||
async def maybe_backfill(
|
async def maybe_backfill(
|
||||||
self, room_id: str, current_depth: int, limit: int
|
self, room_id: str, current_depth: int, limit: int, record_time: bool = True
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Checks the database to see if we should backfill before paginating,
|
"""Checks the database to see if we should backfill before paginating,
|
||||||
and if so do.
|
and if so do.
|
||||||
@ -213,15 +215,19 @@ class FederationHandler:
|
|||||||
limit: The number of events that the pagination request will
|
limit: The number of events that the pagination request will
|
||||||
return. This is used as part of the heuristic to decide if we
|
return. This is used as part of the heuristic to decide if we
|
||||||
should back paginate.
|
should back paginate.
|
||||||
|
record_time: Whether to record the time it takes to backfill.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if we actually tried to backfill something, otherwise False.
|
True if we actually tried to backfill something, otherwise False.
|
||||||
"""
|
"""
|
||||||
# Starting the processing time here so we can include the room backfill
|
# Starting the processing time here so we can include the room backfill
|
||||||
# linearizer lock queue in the timing
|
# linearizer lock queue in the timing
|
||||||
processing_start_time = self.clock.time_msec()
|
processing_start_time = self.clock.time_msec() if record_time else 0
|
||||||
|
|
||||||
async with self._room_backfill.queue(room_id):
|
async with self._room_backfill.queue(room_id):
|
||||||
|
async with self._worker_locks.acquire_read_write_lock(
|
||||||
|
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
|
||||||
|
):
|
||||||
return await self._maybe_backfill_inner(
|
return await self._maybe_backfill_inner(
|
||||||
room_id,
|
room_id,
|
||||||
current_depth,
|
current_depth,
|
||||||
@ -305,12 +311,21 @@ class FederationHandler:
|
|||||||
# of history that extends all the way back to where we are currently paginating
|
# of history that extends all the way back to where we are currently paginating
|
||||||
# and it's within the 100 events that are returned from `/backfill`.
|
# and it's within the 100 events that are returned from `/backfill`.
|
||||||
if not sorted_backfill_points and current_depth != MAX_DEPTH:
|
if not sorted_backfill_points and current_depth != MAX_DEPTH:
|
||||||
|
# Check that we actually have later backfill points, if not just return.
|
||||||
|
have_later_backfill_points = await self.store.get_backfill_points_in_room(
|
||||||
|
room_id=room_id,
|
||||||
|
current_depth=MAX_DEPTH,
|
||||||
|
limit=1,
|
||||||
|
)
|
||||||
|
if not have_later_backfill_points:
|
||||||
|
return False
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
|
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
|
||||||
)
|
)
|
||||||
run_as_background_process(
|
run_as_background_process(
|
||||||
"_maybe_backfill_inner_anyway_with_max_depth",
|
"_maybe_backfill_inner_anyway_with_max_depth",
|
||||||
self._maybe_backfill_inner,
|
self.maybe_backfill,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
# We use `MAX_DEPTH` so that we find all backfill points next
|
# We use `MAX_DEPTH` so that we find all backfill points next
|
||||||
# time (all events are below the `MAX_DEPTH`)
|
# time (all events are below the `MAX_DEPTH`)
|
||||||
@ -319,7 +334,7 @@ class FederationHandler:
|
|||||||
# We don't want to start another timing observation from this
|
# We don't want to start another timing observation from this
|
||||||
# nested recursive call. The top-most call can record the time
|
# nested recursive call. The top-most call can record the time
|
||||||
# overall otherwise the smaller one will throw off the results.
|
# overall otherwise the smaller one will throw off the results.
|
||||||
processing_start_time=None,
|
record_time=False,
|
||||||
)
|
)
|
||||||
# We return `False` because we're backfilling in the background and there is
|
# We return `False` because we're backfilling in the background and there is
|
||||||
# no new events immediately for the caller to know about yet.
|
# no new events immediately for the caller to know about yet.
|
||||||
|
@ -487,9 +487,6 @@ class PaginationHandler:
|
|||||||
|
|
||||||
room_token = from_token.room_key
|
room_token = from_token.room_key
|
||||||
|
|
||||||
async with self._worker_locks.acquire_read_write_lock(
|
|
||||||
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
|
|
||||||
):
|
|
||||||
(membership, member_event_id) = (None, None)
|
(membership, member_event_id) = (None, None)
|
||||||
if not use_admin_priviledge:
|
if not use_admin_priviledge:
|
||||||
(
|
(
|
||||||
@ -604,13 +601,11 @@ class PaginationHandler:
|
|||||||
or missing_too_many_events
|
or missing_too_many_events
|
||||||
or not_enough_events_to_fill_response
|
or not_enough_events_to_fill_response
|
||||||
):
|
):
|
||||||
did_backfill = (
|
did_backfill = await self.hs.get_federation_handler().maybe_backfill(
|
||||||
await self.hs.get_federation_handler().maybe_backfill(
|
|
||||||
room_id,
|
room_id,
|
||||||
curr_topo,
|
curr_topo,
|
||||||
limit=pagin_config.limit,
|
limit=pagin_config.limit,
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# If we did backfill something, refetch the events from the database to
|
# If we did backfill something, refetch the events from the database to
|
||||||
# catch anything new that might have been added since we last fetched.
|
# catch anything new that might have been added since we last fetched.
|
||||||
|
@ -0,0 +1,30 @@
|
|||||||
|
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Mark the worker_read_write_locks* tables as UNLOGGED, to increase
|
||||||
|
-- performance. This means that we don't replicate the tables, and they get
|
||||||
|
-- truncated on a crash. This is acceptable as a) in those cases it's likely
|
||||||
|
-- that Synapse needs to be stopped/restarted anyway, and b) the locks are
|
||||||
|
-- considered best-effort anyway.
|
||||||
|
|
||||||
|
-- We need to remove and recreate the circular foreign key references, as
|
||||||
|
-- UNLOGGED tables can't reference normal tables.
|
||||||
|
ALTER TABLE worker_read_write_locks_mode DROP CONSTRAINT IF EXISTS worker_read_write_locks_mode_foreign;
|
||||||
|
|
||||||
|
ALTER TABLE worker_read_write_locks SET UNLOGGED;
|
||||||
|
ALTER TABLE worker_read_write_locks_mode SET UNLOGGED;
|
||||||
|
|
||||||
|
ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign
|
||||||
|
FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED;
|
Loading…
Reference in New Issue
Block a user