Handle race between persisting an event and un-partial stating a room (#13100)
Whenever we want to persist an event, we first compute an event context, which includes the state at the event and a flag indicating whether the state is partial. After a lot of processing, we finally try to store the event in the database, which can fail for partial state events when the containing room has been un-partial stated in the meantime. We detect the race as a foreign key constraint failure in the data store layer and turn it into a special `PartialStateConflictError` exception, which makes its way up to the method in which we computed the event context. To make things difficult, the exception needs to cross a replication request: `/fed_send_events` for events coming over federation and `/send_event` for events from clients. We transport the `PartialStateConflictError` as a `409 Conflict` over replication and turn `409`s back into `PartialStateConflictError`s on the worker making the request. All client events go through `EventCreationHandler.handle_new_client_event`, which is called in *a lot* of places. Instead of trying to update all the code which creates client events, we turn the `PartialStateConflictError` into a `429 Too Many Requests` in `EventCreationHandler.handle_new_client_event` and hope that clients take it as a hint to retry their request. On the federation event side, there are 7 places which compute event contexts. 4 of them use outlier event contexts: `FederationEventHandler._auth_and_persist_outliers_inner`, `FederationHandler.do_knock`, `FederationHandler.on_invite_request` and `FederationHandler.do_remotely_reject_invite`. These events won't have the partial state flag, so we do not need to do anything for then. The remaining 3 paths which create events are `FederationEventHandler.process_remote_join`, `FederationEventHandler.on_send_membership_event` and `FederationEventHandler._process_received_pdu`. We can't experience the race in `process_remote_join`, unless we're handling an additional join into a partial state room, which currently blocks, so we make no attempt to handle it correctly. `on_send_membership_event` is only called by `FederationServer._on_send_membership_event`, so we catch the `PartialStateConflictError` there and retry just once. `_process_received_pdu` is called by `on_receive_pdu` for incoming events and `_process_pulled_event` for backfill. The latter should never try to persist partial state events, so we ignore it. We catch the `PartialStateConflictError` in `on_receive_pdu` and retry just once. Refering to the graph of code paths in https://github.com/matrix-org/synapse/issues/12988#issuecomment-1156857648 may make the above make more sense. Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
parent
6ba732fefe
commit
68db233f0c
|
@ -0,0 +1 @@
|
||||||
|
Faster room joins: Handle race between persisting an event and un-partial stating a room.
|
|
@ -67,6 +67,7 @@ from synapse.replication.http.federation import (
|
||||||
ReplicationFederationSendEduRestServlet,
|
ReplicationFederationSendEduRestServlet,
|
||||||
ReplicationGetQueryRestServlet,
|
ReplicationGetQueryRestServlet,
|
||||||
)
|
)
|
||||||
|
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||||
from synapse.storage.databases.main.lock import Lock
|
from synapse.storage.databases.main.lock import Lock
|
||||||
from synapse.types import JsonDict, StateMap, get_domain_from_id
|
from synapse.types import JsonDict, StateMap, get_domain_from_id
|
||||||
from synapse.util import json_decoder, unwrapFirstError
|
from synapse.util import json_decoder, unwrapFirstError
|
||||||
|
@ -882,9 +883,20 @@ class FederationServer(FederationBase):
|
||||||
logger.warning("%s", errmsg)
|
logger.warning("%s", errmsg)
|
||||||
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
|
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
|
||||||
|
|
||||||
return await self._federation_event_handler.on_send_membership_event(
|
try:
|
||||||
origin, event
|
return await self._federation_event_handler.on_send_membership_event(
|
||||||
)
|
origin, event
|
||||||
|
)
|
||||||
|
except PartialStateConflictError:
|
||||||
|
# The room was un-partial stated while we were persisting the event.
|
||||||
|
# Try once more, with full state this time.
|
||||||
|
logger.info(
|
||||||
|
"Room %s was un-partial stated during `on_send_membership_event`, trying again.",
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
return await self._federation_event_handler.on_send_membership_event(
|
||||||
|
origin, event
|
||||||
|
)
|
||||||
|
|
||||||
async def on_event_auth(
|
async def on_event_auth(
|
||||||
self, origin: str, room_id: str, event_id: str
|
self, origin: str, room_id: str, event_id: str
|
||||||
|
|
|
@ -45,6 +45,7 @@ from synapse.api.errors import (
|
||||||
FederationDeniedError,
|
FederationDeniedError,
|
||||||
FederationError,
|
FederationError,
|
||||||
HttpResponseException,
|
HttpResponseException,
|
||||||
|
LimitExceededError,
|
||||||
NotFoundError,
|
NotFoundError,
|
||||||
RequestSendFailed,
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
|
@ -64,6 +65,7 @@ from synapse.replication.http.federation import (
|
||||||
ReplicationCleanRoomRestServlet,
|
ReplicationCleanRoomRestServlet,
|
||||||
ReplicationStoreRoomOnOutlierMembershipRestServlet,
|
ReplicationStoreRoomOnOutlierMembershipRestServlet,
|
||||||
)
|
)
|
||||||
|
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.types import JsonDict, StateMap, get_domain_from_id
|
from synapse.types import JsonDict, StateMap, get_domain_from_id
|
||||||
|
@ -549,15 +551,29 @@ class FederationHandler:
|
||||||
# https://github.com/matrix-org/synapse/issues/12998
|
# https://github.com/matrix-org/synapse/issues/12998
|
||||||
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
|
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
|
||||||
|
|
||||||
max_stream_id = await self._federation_event_handler.process_remote_join(
|
try:
|
||||||
origin,
|
max_stream_id = (
|
||||||
room_id,
|
await self._federation_event_handler.process_remote_join(
|
||||||
auth_chain,
|
origin,
|
||||||
state,
|
room_id,
|
||||||
event,
|
auth_chain,
|
||||||
room_version_obj,
|
state,
|
||||||
partial_state=ret.partial_state,
|
event,
|
||||||
)
|
room_version_obj,
|
||||||
|
partial_state=ret.partial_state,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except PartialStateConflictError as e:
|
||||||
|
# The homeserver was already in the room and it is no longer partial
|
||||||
|
# stated. We ought to be doing a local join instead. Turn the error into
|
||||||
|
# a 429, as a hint to the client to try again.
|
||||||
|
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
|
||||||
|
# do a remote join for restricted rooms even if we have full state.
|
||||||
|
logger.error(
|
||||||
|
"Room %s was un-partial stated while processing remote join.",
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
|
||||||
|
|
||||||
if ret.partial_state:
|
if ret.partial_state:
|
||||||
# Kick off the process of asynchronously fetching the state for this
|
# Kick off the process of asynchronously fetching the state for this
|
||||||
|
@ -1567,11 +1583,6 @@ class FederationHandler:
|
||||||
|
|
||||||
# we raced against more events arriving with partial state. Go round
|
# we raced against more events arriving with partial state. Go round
|
||||||
# the loop again. We've already logged a warning, so no need for more.
|
# the loop again. We've already logged a warning, so no need for more.
|
||||||
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
|
|
||||||
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
|
|
||||||
# having partial state.
|
|
||||||
# https://github.com/matrix-org/synapse/issues/12988
|
|
||||||
#
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
events = await self.store.get_events_as_list(
|
events = await self.store.get_events_as_list(
|
||||||
|
|
|
@ -64,6 +64,7 @@ from synapse.replication.http.federation import (
|
||||||
ReplicationFederationSendEventsRestServlet,
|
ReplicationFederationSendEventsRestServlet,
|
||||||
)
|
)
|
||||||
from synapse.state import StateResolutionStore
|
from synapse.state import StateResolutionStore
|
||||||
|
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
|
@ -275,7 +276,16 @@ class FederationEventHandler:
|
||||||
affected=pdu.event_id,
|
affected=pdu.event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
await self._process_received_pdu(origin, pdu, state_ids=None)
|
try:
|
||||||
|
await self._process_received_pdu(origin, pdu, state_ids=None)
|
||||||
|
except PartialStateConflictError:
|
||||||
|
# The room was un-partial stated while we were processing the PDU.
|
||||||
|
# Try once more, with full state this time.
|
||||||
|
logger.info(
|
||||||
|
"Room %s was un-partial stated while processing the PDU, trying again.",
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
await self._process_received_pdu(origin, pdu, state_ids=None)
|
||||||
|
|
||||||
async def on_send_membership_event(
|
async def on_send_membership_event(
|
||||||
self, origin: str, event: EventBase
|
self, origin: str, event: EventBase
|
||||||
|
@ -306,6 +316,9 @@ class FederationEventHandler:
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
SynapseError if the event is not accepted into the room
|
SynapseError if the event is not accepted into the room
|
||||||
|
PartialStateConflictError if the room was un-partial stated in between
|
||||||
|
computing the state at the event and persisting it. The caller should
|
||||||
|
retry exactly once in this case.
|
||||||
"""
|
"""
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"on_send_membership_event: Got event: %s, signatures: %s",
|
"on_send_membership_event: Got event: %s, signatures: %s",
|
||||||
|
@ -423,6 +436,8 @@ class FederationEventHandler:
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
SynapseError if the response is in some way invalid.
|
SynapseError if the response is in some way invalid.
|
||||||
|
PartialStateConflictError if the homeserver is already in the room and it
|
||||||
|
has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
create_event = None
|
create_event = None
|
||||||
for e in state:
|
for e in state:
|
||||||
|
@ -1084,10 +1099,14 @@ class FederationEventHandler:
|
||||||
|
|
||||||
state_ids: Normally None, but if we are handling a gap in the graph
|
state_ids: Normally None, but if we are handling a gap in the graph
|
||||||
(ie, we are missing one or more prev_events), the resolved state at the
|
(ie, we are missing one or more prev_events), the resolved state at the
|
||||||
event
|
event. Must not be partial state.
|
||||||
|
|
||||||
backfilled: True if this is part of a historical batch of events (inhibits
|
backfilled: True if this is part of a historical batch of events (inhibits
|
||||||
notification to clients, and validation of device keys.)
|
notification to clients, and validation of device keys.)
|
||||||
|
|
||||||
|
PartialStateConflictError: if the room was un-partial stated in between
|
||||||
|
computing the state at the event and persisting it. The caller should retry
|
||||||
|
exactly once in this case. Will never be raised if `state_ids` is provided.
|
||||||
"""
|
"""
|
||||||
logger.debug("Processing event: %s", event)
|
logger.debug("Processing event: %s", event)
|
||||||
assert not event.internal_metadata.outlier
|
assert not event.internal_metadata.outlier
|
||||||
|
@ -1933,6 +1952,9 @@ class FederationEventHandler:
|
||||||
event: The event itself.
|
event: The event itself.
|
||||||
context: The event context.
|
context: The event context.
|
||||||
backfilled: True if the event was backfilled.
|
backfilled: True if the event was backfilled.
|
||||||
|
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
# this method should not be called on outliers (those code paths call
|
# this method should not be called on outliers (those code paths call
|
||||||
# persist_events_and_notify directly.)
|
# persist_events_and_notify directly.)
|
||||||
|
@ -1985,6 +2007,10 @@ class FederationEventHandler:
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The stream ID after which all events have been persisted.
|
The stream ID after which all events have been persisted.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
if not event_and_contexts:
|
if not event_and_contexts:
|
||||||
return self._store.get_room_max_stream_ordering()
|
return self._store.get_room_max_stream_ordering()
|
||||||
|
@ -1993,14 +2019,19 @@ class FederationEventHandler:
|
||||||
if instance != self._instance_name:
|
if instance != self._instance_name:
|
||||||
# Limit the number of events sent over replication. We choose 200
|
# Limit the number of events sent over replication. We choose 200
|
||||||
# here as that is what we default to in `max_request_body_size(..)`
|
# here as that is what we default to in `max_request_body_size(..)`
|
||||||
for batch in batch_iter(event_and_contexts, 200):
|
try:
|
||||||
result = await self._send_events(
|
for batch in batch_iter(event_and_contexts, 200):
|
||||||
instance_name=instance,
|
result = await self._send_events(
|
||||||
store=self._store,
|
instance_name=instance,
|
||||||
room_id=room_id,
|
store=self._store,
|
||||||
event_and_contexts=batch,
|
room_id=room_id,
|
||||||
backfilled=backfilled,
|
event_and_contexts=batch,
|
||||||
)
|
backfilled=backfilled,
|
||||||
|
)
|
||||||
|
except SynapseError as e:
|
||||||
|
if e.code == HTTPStatus.CONFLICT:
|
||||||
|
raise PartialStateConflictError()
|
||||||
|
raise
|
||||||
return result["max_stream_id"]
|
return result["max_stream_id"]
|
||||||
else:
|
else:
|
||||||
assert self._storage_controllers.persistence
|
assert self._storage_controllers.persistence
|
||||||
|
|
|
@ -37,6 +37,7 @@ from synapse.api.errors import (
|
||||||
AuthError,
|
AuthError,
|
||||||
Codes,
|
Codes,
|
||||||
ConsentNotGivenError,
|
ConsentNotGivenError,
|
||||||
|
LimitExceededError,
|
||||||
NotFoundError,
|
NotFoundError,
|
||||||
ShadowBanError,
|
ShadowBanError,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
|
@ -53,6 +54,7 @@ from synapse.handlers.directory import DirectoryHandler
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||||
|
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
|
@ -1250,6 +1252,8 @@ class EventCreationHandler:
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ShadowBanError if the requester has been shadow-banned.
|
ShadowBanError if the requester has been shadow-banned.
|
||||||
|
SynapseError(503) if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
extra_users = extra_users or []
|
extra_users = extra_users or []
|
||||||
|
|
||||||
|
@ -1300,24 +1304,35 @@ class EventCreationHandler:
|
||||||
|
|
||||||
# We now persist the event (and update the cache in parallel, since we
|
# We now persist the event (and update the cache in parallel, since we
|
||||||
# don't want to block on it).
|
# don't want to block on it).
|
||||||
result, _ = await make_deferred_yieldable(
|
try:
|
||||||
gather_results(
|
result, _ = await make_deferred_yieldable(
|
||||||
(
|
gather_results(
|
||||||
run_in_background(
|
(
|
||||||
self._persist_event,
|
run_in_background(
|
||||||
requester=requester,
|
self._persist_event,
|
||||||
event=event,
|
requester=requester,
|
||||||
context=context,
|
event=event,
|
||||||
ratelimit=ratelimit,
|
context=context,
|
||||||
extra_users=extra_users,
|
ratelimit=ratelimit,
|
||||||
|
extra_users=extra_users,
|
||||||
|
),
|
||||||
|
run_in_background(
|
||||||
|
self.cache_joined_hosts_for_event, event, context
|
||||||
|
).addErrback(
|
||||||
|
log_failure, "cache_joined_hosts_for_event failed"
|
||||||
|
),
|
||||||
),
|
),
|
||||||
run_in_background(
|
consumeErrors=True,
|
||||||
self.cache_joined_hosts_for_event, event, context
|
)
|
||||||
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
|
).addErrback(unwrapFirstError)
|
||||||
),
|
except PartialStateConflictError as e:
|
||||||
consumeErrors=True,
|
# The event context needs to be recomputed.
|
||||||
|
# Turn the error into a 429, as a hint to the client to try again.
|
||||||
|
logger.info(
|
||||||
|
"Room %s was un-partial stated while persisting client event.",
|
||||||
|
event.room_id,
|
||||||
)
|
)
|
||||||
).addErrback(unwrapFirstError)
|
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -1332,6 +1347,9 @@ class EventCreationHandler:
|
||||||
"""Actually persists the event. Should only be called by
|
"""Actually persists the event. Should only be called by
|
||||||
`handle_new_client_event`, and see its docstring for documentation of
|
`handle_new_client_event`, and see its docstring for documentation of
|
||||||
the arguments.
|
the arguments.
|
||||||
|
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Skip push notification actions for historical messages
|
# Skip push notification actions for historical messages
|
||||||
|
@ -1348,16 +1366,21 @@ class EventCreationHandler:
|
||||||
# If we're a worker we need to hit out to the master.
|
# If we're a worker we need to hit out to the master.
|
||||||
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
||||||
if writer_instance != self._instance_name:
|
if writer_instance != self._instance_name:
|
||||||
result = await self.send_event(
|
try:
|
||||||
instance_name=writer_instance,
|
result = await self.send_event(
|
||||||
event_id=event.event_id,
|
instance_name=writer_instance,
|
||||||
store=self.store,
|
event_id=event.event_id,
|
||||||
requester=requester,
|
store=self.store,
|
||||||
event=event,
|
requester=requester,
|
||||||
context=context,
|
event=event,
|
||||||
ratelimit=ratelimit,
|
context=context,
|
||||||
extra_users=extra_users,
|
ratelimit=ratelimit,
|
||||||
)
|
extra_users=extra_users,
|
||||||
|
)
|
||||||
|
except SynapseError as e:
|
||||||
|
if e.code == HTTPStatus.CONFLICT:
|
||||||
|
raise PartialStateConflictError()
|
||||||
|
raise
|
||||||
stream_id = result["stream_id"]
|
stream_id = result["stream_id"]
|
||||||
event_id = result["event_id"]
|
event_id = result["event_id"]
|
||||||
if event_id != event.event_id:
|
if event_id != event.event_id:
|
||||||
|
@ -1485,6 +1508,10 @@ class EventCreationHandler:
|
||||||
The persisted event. This may be different than the given event if
|
The persisted event. This may be different than the given event if
|
||||||
it was de-duplicated (e.g. because we had already persisted an
|
it was de-duplicated (e.g. because we had already persisted an
|
||||||
event with the same transaction ID.)
|
event with the same transaction ID.)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
extra_users = extra_users or []
|
extra_users = extra_users or []
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
{
|
{
|
||||||
"max_stream_id": 32443,
|
"max_stream_id": 32443,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
|
||||||
|
context that needs to be recomputed due to the un-partial stating of a room.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
NAME = "fed_send_events"
|
NAME = "fed_send_events"
|
||||||
|
|
|
@ -59,6 +59,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||||
|
|
||||||
{ "stream_id": 12345, "event_id": "$abcdef..." }
|
{ "stream_id": 12345, "event_id": "$abcdef..." }
|
||||||
|
|
||||||
|
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
|
||||||
|
context that needs to be recomputed due to the un-partial stating of a room.
|
||||||
|
|
||||||
The returned event ID may not match the sent event if it was deduplicated.
|
The returned event ID may not match the sent event if it was deduplicated.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
|
@ -315,6 +315,10 @@ class EventsPersistenceStorageController:
|
||||||
if they were deduplicated due to an event already existing that
|
if they were deduplicated due to an event already existing that
|
||||||
matched the transaction ID; the existing event is returned in such
|
matched the transaction ID; the existing event is returned in such
|
||||||
a case.
|
a case.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
||||||
for event, ctx in events_and_contexts:
|
for event, ctx in events_and_contexts:
|
||||||
|
@ -363,6 +367,10 @@ class EventsPersistenceStorageController:
|
||||||
latest persisted event. The returned event may not match the given
|
latest persisted event. The returned event may not match the given
|
||||||
event if it was deduplicated due to an existing event matching the
|
event if it was deduplicated due to an existing event matching the
|
||||||
transaction ID.
|
transaction ID.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
# add_to_queue returns a map from event ID to existing event ID if the
|
# add_to_queue returns a map from event ID to existing event ID if the
|
||||||
# event was deduplicated. (The dict may also include other entries if
|
# event was deduplicated. (The dict may also include other entries if
|
||||||
|
@ -453,6 +461,10 @@ class EventsPersistenceStorageController:
|
||||||
Returns:
|
Returns:
|
||||||
A dictionary of event ID to event ID we didn't persist as we already
|
A dictionary of event ID to event ID we didn't persist as we already
|
||||||
had another event persisted with the same TXN ID.
|
had another event persisted with the same TXN ID.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
replaced_events: Dict[str, str] = {}
|
replaced_events: Dict[str, str] = {}
|
||||||
if not events_and_contexts:
|
if not events_and_contexts:
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from http import HTTPStatus
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
|
@ -35,6 +36,7 @@ from prometheus_client import Counter
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
|
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
|
||||||
|
from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.events import EventBase, relation_from_event
|
from synapse.events import EventBase, relation_from_event
|
||||||
from synapse.events.snapshot import EventContext
|
from synapse.events.snapshot import EventContext
|
||||||
|
@ -69,6 +71,24 @@ event_counter = Counter(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class PartialStateConflictError(SynapseError):
|
||||||
|
"""An internal error raised when attempting to persist an event with partial state
|
||||||
|
after the room containing the event has been un-partial stated.
|
||||||
|
|
||||||
|
This error should be handled by recomputing the event context and trying again.
|
||||||
|
|
||||||
|
This error has an HTTP status code so that it can be transported over replication.
|
||||||
|
It should not be exposed to clients.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__(
|
||||||
|
HTTPStatus.CONFLICT,
|
||||||
|
msg="Cannot persist partial state event in un-partial stated room",
|
||||||
|
errcode=Codes.UNKNOWN,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_attribs=True)
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
class DeltaState:
|
class DeltaState:
|
||||||
"""Deltas to use to update the `current_state_events` table.
|
"""Deltas to use to update the `current_state_events` table.
|
||||||
|
@ -154,6 +174,10 @@ class PersistEventsStore:
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Resolves when the events have been persisted
|
Resolves when the events have been persisted
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# We want to calculate the stream orderings as late as possible, as
|
# We want to calculate the stream orderings as late as possible, as
|
||||||
|
@ -354,6 +378,9 @@ class PersistEventsStore:
|
||||||
For each room, a list of the event ids which are the forward
|
For each room, a list of the event ids which are the forward
|
||||||
extremities.
|
extremities.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
state_delta_for_room = state_delta_for_room or {}
|
state_delta_for_room = state_delta_for_room or {}
|
||||||
new_forward_extremities = new_forward_extremities or {}
|
new_forward_extremities = new_forward_extremities or {}
|
||||||
|
@ -1304,6 +1331,10 @@ class PersistEventsStore:
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
new list, without events which are already in the events table.
|
new list, without events which are already in the events table.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
"""
|
"""
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT event_id, outlier FROM events WHERE event_id in (%s)"
|
"SELECT event_id, outlier FROM events WHERE event_id in (%s)"
|
||||||
|
@ -2215,6 +2246,11 @@ class PersistEventsStore:
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
|
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""
|
||||||
|
Raises:
|
||||||
|
PartialStateConflictError: if attempting to persist a partial state event in
|
||||||
|
a room that has been un-partial stated.
|
||||||
|
"""
|
||||||
state_groups = {}
|
state_groups = {}
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
if event.internal_metadata.is_outlier():
|
if event.internal_metadata.is_outlier():
|
||||||
|
@ -2239,19 +2275,37 @@ class PersistEventsStore:
|
||||||
# if we have partial state for these events, record the fact. (This happens
|
# if we have partial state for these events, record the fact. (This happens
|
||||||
# here rather than in _store_event_txn because it also needs to happen when
|
# here rather than in _store_event_txn because it also needs to happen when
|
||||||
# we de-outlier an event.)
|
# we de-outlier an event.)
|
||||||
self.db_pool.simple_insert_many_txn(
|
try:
|
||||||
txn,
|
self.db_pool.simple_insert_many_txn(
|
||||||
table="partial_state_events",
|
txn,
|
||||||
keys=("room_id", "event_id"),
|
table="partial_state_events",
|
||||||
values=[
|
keys=("room_id", "event_id"),
|
||||||
(
|
values=[
|
||||||
event.room_id,
|
(
|
||||||
event.event_id,
|
event.room_id,
|
||||||
)
|
event.event_id,
|
||||||
for event, ctx in events_and_contexts
|
)
|
||||||
if ctx.partial_state
|
for event, ctx in events_and_contexts
|
||||||
],
|
if ctx.partial_state
|
||||||
)
|
],
|
||||||
|
)
|
||||||
|
except self.db_pool.engine.module.IntegrityError:
|
||||||
|
logger.info(
|
||||||
|
"Cannot persist events %s in rooms %s: room has been un-partial stated",
|
||||||
|
[
|
||||||
|
event.event_id
|
||||||
|
for event, ctx in events_and_contexts
|
||||||
|
if ctx.partial_state
|
||||||
|
],
|
||||||
|
list(
|
||||||
|
{
|
||||||
|
event.room_id
|
||||||
|
for event, ctx in events_and_contexts
|
||||||
|
if ctx.partial_state
|
||||||
|
}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
raise PartialStateConflictError()
|
||||||
|
|
||||||
self.db_pool.simple_upsert_many_txn(
|
self.db_pool.simple_upsert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
|
|
|
@ -1156,19 +1156,25 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||||
return room_servers
|
return room_servers
|
||||||
|
|
||||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||||
# this can race with incoming events, so we watch out for FK errors.
|
"""Clears the partial state flag for a room.
|
||||||
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
|
|
||||||
# is not atomic. I fear we need an application-level lock.
|
Args:
|
||||||
# https://github.com/matrix-org/synapse/issues/12988
|
room_id: The room whose partial state flag is to be cleared.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
`True` if the partial state flag has been cleared successfully.
|
||||||
|
|
||||||
|
`False` if the partial state flag could not be cleared because the room
|
||||||
|
still contains events with partial state.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
|
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
except self.db_pool.engine.module.DatabaseError as e:
|
except self.db_pool.engine.module.IntegrityError as e:
|
||||||
# TODO(faster_joins): how do we distinguish between FK errors and other errors?
|
# Assume that any `IntegrityError`s are due to partial state events.
|
||||||
# https://github.com/matrix-org/synapse/issues/12988
|
logger.info(
|
||||||
logger.warning(
|
|
||||||
"Exception while clearing lazy partial-state-room %s, retrying: %s",
|
"Exception while clearing lazy partial-state-room %s, retrying: %s",
|
||||||
room_id,
|
room_id,
|
||||||
e,
|
e,
|
||||||
|
|
Loading…
Reference in New Issue