Remove experimental MSC2716 implementation to incrementally import history into existing rooms (#15748)

Context for why we're removing the implementation:

 - https://github.com/matrix-org/matrix-spec-proposals/pull/2716#issuecomment-1487441010
 - https://github.com/matrix-org/matrix-spec-proposals/pull/2716#issuecomment-1504262734

Anyone wanting to continue MSC2716, should also address these leftover tasks: https://github.com/matrix-org/synapse/issues/10737

Closes https://github.com/matrix-org/synapse/issues/10737 in the fact that it is not longer necessary to track those things.
This commit is contained in:
Eric Eastwood 2023-06-16 14:12:24 -05:00 committed by GitHub
parent 2ac6c3bbb5
commit 0f02f0b4da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 36 additions and 2103 deletions

View File

@ -0,0 +1 @@
Remove experimental [MSC2716](https://github.com/matrix-org/matrix-spec-proposals/pull/2716) implementation to incrementally import history into existing rooms.

View File

@ -92,8 +92,6 @@ allow_device_name_lookup_over_federation: true
## Experimental Features ##
experimental_features:
# Enable history backfilling support
msc2716_enabled: true
# client-side support for partial state in /send_join responses
faster_joins: true
# Enable support for polls

View File

@ -244,7 +244,6 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
],
"shared_extra_conf": {},
"worker_extra_conf": "",

View File

@ -232,7 +232,6 @@ information.
^/_matrix/client/v1/rooms/.*/hierarchy$
^/_matrix/client/(v1|unstable)/rooms/.*/relations/
^/_matrix/client/v1/rooms/.*/threads$
^/_matrix/client/unstable/org.matrix.msc2716/rooms/.*/batch_send$
^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$
^/_matrix/client/(r0|v3|unstable)/account/3pid$
^/_matrix/client/(r0|v3|unstable)/account/whoami$

View File

@ -246,10 +246,6 @@ else
else
export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite
fi
# The tests for importing historical messages (MSC2716)
# only pass with monoliths, currently.
test_tags="$test_tags,msc2716"
fi
if [[ -n "$ASYNCIO_REACTOR" ]]; then

View File

@ -123,10 +123,6 @@ class EventTypes:
SpaceChild: Final = "m.space.child"
SpaceParent: Final = "m.space.parent"
MSC2716_INSERTION: Final = "org.matrix.msc2716.insertion"
MSC2716_BATCH: Final = "org.matrix.msc2716.batch"
MSC2716_MARKER: Final = "org.matrix.msc2716.marker"
Reaction: Final = "m.reaction"
@ -222,16 +218,6 @@ class EventContentFields:
# Used in m.room.guest_access events.
GUEST_ACCESS: Final = "guest_access"
# Used on normal messages to indicate they were historically imported after the fact
MSC2716_HISTORICAL: Final = "org.matrix.msc2716.historical"
# For "insertion" events to indicate what the next batch ID should be in
# order to connect to it
MSC2716_NEXT_BATCH_ID: Final = "next_batch_id"
# Used on "batch" events to indicate which insertion event it connects to
MSC2716_BATCH_ID: Final = "batch_id"
# For "marker" events
MSC2716_INSERTION_EVENT_REFERENCE: Final = "insertion_event_reference"
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"

View File

@ -91,11 +91,6 @@ class RoomVersion:
# MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
# m.room.membership event with membership 'knock'.
msc2403_knocking: bool
# MSC2716: Adds m.room.power_levels -> content.historical field to control
# whether "insertion", "chunk", "marker" events can be sent
msc2716_historical: bool
# MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events
msc2716_redactions: bool
# MSC3389: Protect relation information from redaction.
msc3389_relation_redactions: bool
# MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of
@ -130,8 +125,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -153,8 +146,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -176,8 +167,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -199,8 +188,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -222,8 +209,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -245,8 +230,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -268,8 +251,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -291,8 +272,6 @@ class RoomVersions:
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -314,8 +293,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=False,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -337,8 +314,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -360,8 +335,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=False,
@ -383,8 +356,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
@ -406,8 +377,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@ -415,29 +384,6 @@ class RoomVersions:
msc3931_push_features=(),
msc3989_redaction_rules=False,
)
MSC2716v4 = RoomVersion(
"org.matrix.msc2716v4",
RoomDisposition.UNSTABLE,
EventFormatVersions.ROOM_V4_PLUS,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2175_implicit_room_creator=False,
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=True,
msc2716_historical=True,
msc2716_redactions=True,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3821_redaction_rules=False,
msc3931_push_features=(),
msc3989_redaction_rules=False,
)
MSC1767v10 = RoomVersion(
# MSC1767 (Extensible Events) based on room version "10"
"org.matrix.msc1767.10",
@ -453,8 +399,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@ -476,8 +420,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@ -500,8 +442,6 @@ class RoomVersions:
msc3083_join_rules=True,
msc3375_redaction_rules=True,
msc2403_knocking=True,
msc2716_historical=False,
msc2716_redactions=False,
msc3389_relation_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
@ -526,7 +466,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
RoomVersions.V9,
RoomVersions.MSC3787,
RoomVersions.V10,
RoomVersions.MSC2716v4,
RoomVersions.MSC3989,
RoomVersions.MSC3820opt2,
)

View File

@ -83,7 +83,6 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.registration import RegistrationWorkerStore
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.room_batch import RoomBatchStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
@ -120,7 +119,6 @@ class GenericWorkerStore(
# the races it creates aren't too bad.
KeyStore,
RoomWorkerStore,
RoomBatchStore,
DirectoryWorkerStore,
PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,

View File

@ -247,9 +247,6 @@ class ExperimentalConfig(Config):
# MSC3026 (busy presence state)
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
# MSC2716 (importing historical messages)
self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False)
# MSC3244 (room version capabilities)
self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)

View File

@ -339,13 +339,6 @@ def check_state_dependent_auth_rules(
if event.type == EventTypes.Redaction:
check_redaction(event.room_version, event, auth_dict)
if (
event.type == EventTypes.MSC2716_INSERTION
or event.type == EventTypes.MSC2716_BATCH
or event.type == EventTypes.MSC2716_MARKER
):
check_historical(event.room_version, event, auth_dict)
logger.debug("Allowing! %s", event)
@ -365,7 +358,6 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
RoomVersions.V9,
RoomVersions.MSC3787,
RoomVersions.V10,
RoomVersions.MSC2716v4,
RoomVersions.MSC1767v10,
}
@ -823,38 +815,6 @@ def check_redaction(
raise AuthError(403, "You don't have permission to redact events")
def check_historical(
room_version_obj: RoomVersion,
event: "EventBase",
auth_events: StateMap["EventBase"],
) -> None:
"""Check whether the event sender is allowed to send historical related
events like "insertion", "batch", and "marker".
Returns:
None
Raises:
AuthError if the event sender is not allowed to send historical related events
("insertion", "batch", and "marker").
"""
# Ignore the auth checks in room versions that do not support historical
# events
if not room_version_obj.msc2716_historical:
return
user_level = get_user_power_level(event.user_id, auth_events)
historical_level = get_named_level(auth_events, "historical", 100)
if user_level < historical_level:
raise UnstableSpecAuthError(
403,
'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")',
errcode=Codes.INSUFFICIENT_POWER,
)
def _check_power_levels(
room_version_obj: RoomVersion,
event: "EventBase",

View File

@ -198,7 +198,6 @@ class _EventInternalMetadata:
soft_failed: DictProperty[bool] = DictProperty("soft_failed")
proactively_send: DictProperty[bool] = DictProperty("proactively_send")
redacted: DictProperty[bool] = DictProperty("redacted")
historical: DictProperty[bool] = DictProperty("historical")
txn_id: DictProperty[str] = DictProperty("txn_id")
"""The transaction ID, if it was set when the event was created."""
@ -288,14 +287,6 @@ class _EventInternalMetadata:
"""
return self._dict.get("redacted", False)
def is_historical(self) -> bool:
"""Whether this is a historical message.
This is used by the batchsend historical message endpoint and
is needed to and mark the event as backfilled and skip some checks
like push notifications.
"""
return self._dict.get("historical", False)
def is_notifiable(self) -> bool:
"""Whether this event can trigger a push notification"""
return not self.is_outlier() or self.is_out_of_band_membership()

View File

@ -164,21 +164,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
if room_version.msc2176_redaction_rules:
add_fields("invite")
if room_version.msc2716_historical:
add_fields("historical")
elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
add_fields("aliases")
elif event_type == EventTypes.RoomHistoryVisibility:
add_fields("history_visibility")
elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
add_fields("redacts")
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION:
add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID)
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH:
add_fields(EventContentFields.MSC2716_BATCH_ID)
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE)
# Protect the rel_type and event_id fields under the m.relates_to field.
if room_version.msc3389_relation_redactions:

View File

@ -105,14 +105,12 @@ backfill_processing_before_timer = Histogram(
)
# TODO: We can refactor this away now that there is only one backfill point again
class _BackfillPointType(Enum):
# a regular backwards extremity (ie, an event which we don't yet have, but which
# is referred to by other events in the DAG)
BACKWARDS_EXTREMITY = enum.auto()
# an MSC2716 "insertion event"
INSERTION_PONT = enum.auto()
@attr.s(slots=True, auto_attribs=True, frozen=True)
class _BackfillPoint:
@ -273,32 +271,10 @@ class FederationHandler:
)
]
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = [
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
room_id=room_id,
current_depth=current_depth,
# We only need to end up with 5 extremities combined with
# the backfill points to make the `/backfill` request ...
# (see the other comment above for more context).
limit=50,
)
]
logger.debug(
"_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
backwards_extremities,
insertion_events_to_be_backfilled,
)
# we now have a list of potential places to backpaginate from. We prefer to
# start with the most recent (ie, max depth), so let's sort the list.
sorted_backfill_points: List[_BackfillPoint] = sorted(
itertools.chain(
backwards_extremities,
insertion_events_to_be_backfilled,
),
backwards_extremities,
key=lambda e: -int(e.depth),
)
@ -411,10 +387,7 @@ class FederationHandler:
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.
if bp.type == _BackfillPointType.INSERTION_PONT:
event_ids_to_check = [bp.event_id]
else:
event_ids_to_check = await self.store.get_successor_events(bp.event_id)
event_ids_to_check = await self.store.get_successor_events(bp.event_id)
events_to_check = await self.store.get_events_as_list(
event_ids_to_check,

View File

@ -601,18 +601,6 @@ class FederationEventHandler:
room_id, [(event, context)]
)
# If we're joining the room again, check if there is new marker
# state indicating that there is new history imported somewhere in
# the DAG. Multiple markers can exist in the current state with
# unique state_keys.
#
# Do this after the state from the remote join was persisted (via
# `persist_events_and_notify`). Otherwise we can run into a
# situation where the create event doesn't exist yet in the
# `current_state_events`
for e in state:
await self._handle_marker_event(origin, e)
return stream_id_after_persist
async def update_state_for_partial_state_event(
@ -915,13 +903,6 @@ class FederationEventHandler:
)
)
# We construct the event lists in source order from `/backfill` response because
# it's a) easiest, but also b) the order in which we process things matters for
# MSC2716 historical batches because many historical events are all at the same
# `depth` and we rely on the tenuous sort that the other server gave us and hope
# they're doing their best. The brittle nature of this ordering for historical
# messages over federation is one of the reasons why we don't want to continue
# on MSC2716 until we have online topological ordering.
events_with_failed_pull_attempts, fresh_events = partition(
new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
)
@ -1460,8 +1441,6 @@ class FederationEventHandler:
await self._run_push_actions_and_persist_event(event, context, backfilled)
await self._handle_marker_event(origin, event)
if backfilled or context.rejected:
return
@ -1559,94 +1538,6 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
@trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
Args:
origin: Origin of the event. Will be called to get the insertion event
marker_event: The event to process
"""
if marker_event.type != EventTypes.MSC2716_MARKER:
# Not a marker event
return
if marker_event.rejected_reason is not None:
# Rejected event
return
# Skip processing a marker event if the room version doesn't
# support it or the event is not from the room creator.
room_version = await self._store.get_room_version(marker_event.room_id)
create_event = await self._store.get_create_event_for_room(marker_event.room_id)
if not room_version.msc2175_implicit_room_creator:
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
else:
room_creator = create_event.sender
if not room_version.msc2716_historical and (
not self._config.experimental.msc2716_enabled
or marker_event.sender != room_creator
):
return
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
)
if insertion_event_id is None:
# Nothing to retrieve then (invalid marker)
return
already_seen_insertion_event = await self._store.have_seen_event(
marker_event.room_id, insertion_event_id
)
if already_seen_insertion_event:
# No need to process a marker again if we have already seen the
# insertion event that it was pointing to
return
logger.debug(
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
)
await self._get_events_and_persist(
origin,
marker_event.room_id,
[insertion_event_id],
)
insertion_event = await self._store.get_event(
insertion_event_id, allow_none=True
)
if insertion_event is None:
logger.warning(
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
origin,
insertion_event_id,
marker_event.event_id,
)
return
logger.debug(
"_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
insertion_event,
marker_event,
)
await self._store.insert_insertion_extremity(
insertion_event_id, marker_event.room_id
)
logger.debug(
"_handle_marker_event: insertion extremity added for %s from marker event %s",
insertion_event,
marker_event,
)
async def backfill_event_id(
self, destinations: List[str], room_id: str, event_id: str
) -> PulledPduInfo:

View File

@ -60,7 +60,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
Requester,
RoomAlias,
@ -573,7 +572,6 @@ class EventCreationHandler:
state_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
@ -599,7 +597,7 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
cases (previously useful for MSC2716).
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@ -614,13 +612,10 @@ class EventCreationHandler:
If non-None, prev_event_ids must also be provided.
state_event_ids:
The full state at a given event. This is used particularly by the MSC2716
/batch_send endpoint. One use case is with insertion events which float at
the beginning of a historical batch and don't have any `prev_events` to
derive from; we add all of these state events as the explicit state so the
rest of the historical batch can inherit the same state and state_group.
This should normally be left as None, which will cause the auth_event_ids
to be calculated based on the room state at the prev_events.
The full state at a given event. This was previously used particularly
by the MSC2716 /batch_send endpoint. This should normally be left as
None, which will cause the auth_event_ids to be calculated based on the
room state at the prev_events.
require_consent: Whether to check if the requester has
consented to the privacy policy.
@ -629,10 +624,6 @@ class EventCreationHandler:
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@ -717,8 +708,6 @@ class EventCreationHandler:
builder.internal_metadata.outlier = outlier
builder.internal_metadata.historical = historical
event, unpersisted_context = await self.create_new_client_event(
builder=builder,
requester=requester,
@ -947,7 +936,6 @@ class EventCreationHandler:
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
"""
@ -961,19 +949,16 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
cases (previously useful for MSC2716).
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
from the database.
state_event_ids:
The full state at a given event. This is used particularly by the MSC2716
/batch_send endpoint. One use case is with insertion events which float at
the beginning of a historical batch and don't have any `prev_events` to
derive from; we add all of these state events as the explicit state so the
rest of the historical batch can inherit the same state and state_group.
This should normally be left as None, which will cause the auth_event_ids
to be calculated based on the room state at the prev_events.
The full state at a given event. This was previously used particularly
by the MSC2716 /batch_send endpoint. This should normally be left as
None, which will cause the auth_event_ids to be calculated based on the
room state at the prev_events.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
@ -981,9 +966,6 @@ class EventCreationHandler:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@ -1053,7 +1035,6 @@ class EventCreationHandler:
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
outlier=outlier,
historical=historical,
depth=depth,
)
context = await unpersisted_context.persist(event)
@ -1145,7 +1126,7 @@ class EventCreationHandler:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
cases (previously useful for MSC2716).
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
@ -1158,13 +1139,10 @@ class EventCreationHandler:
based on the room state at the prev_events.
state_event_ids:
The full state at a given event. This is used particularly by the MSC2716
/batch_send endpoint. One use case is with insertion events which float at
the beginning of a historical batch and don't have any `prev_events` to
derive from; we add all of these state events as the explicit state so the
rest of the historical batch can inherit the same state and state_group.
This should normally be left as None, which will cause the auth_event_ids
to be calculated based on the room state at the prev_events.
The full state at a given event. This was previously used particularly
by the MSC2716 /batch_send endpoint. This should normally be left as
None, which will cause the auth_event_ids to be calculated based on the
room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
@ -1261,52 +1239,6 @@ class EventCreationHandler:
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
context = EventContext.for_outlier(self._storage_controllers)
elif (
event.type == EventTypes.MSC2716_INSERTION
and state_event_ids
and builder.internal_metadata.is_historical()
):
# Add explicit state to the insertion event so it has state to derive
# from even though it's floating with no `prev_events`. The rest of
# the batch can derive from this state and state_group.
#
# TODO(faster_joins): figure out how this works, and make sure that the
# old state is complete.
# https://github.com/matrix-org/synapse/issues/13003
metadata = await self.store.get_metadata_for_events(state_event_ids)
state_map_for_event: MutableStateMap[str] = {}
for state_id in state_event_ids:
data = metadata.get(state_id)
if data is None:
# We're trying to persist a new historical batch of events
# with the given state, e.g. via
# `RoomBatchSendEventRestServlet`. The state can be inferred
# by Synapse or set directly by the client.
#
# Either way, we should have persisted all the state before
# getting here.
raise Exception(
f"State event {state_id} not found in DB,"
" Synapse should have persisted it before using it."
)
if data.state_key is None:
raise Exception(
f"Trying to set non-state event {state_id} as state"
)
state_map_for_event[(data.event_type, data.state_key)] = state_id
# TODO(faster_joins): check how MSC2716 works and whether we can have
# partial state here
# https://github.com/matrix-org/synapse/issues/13003
context = await self.state.calculate_context_info(
event,
state_ids_before_event=state_map_for_event,
partial_state=False,
)
else:
context = await self.state.calculate_context_info(event)
@ -1876,28 +1808,6 @@ class EventCreationHandler:
403, "Redacting server ACL events is not permitted"
)
# Add a little safety stop-gap to prevent people from trying to
# redact MSC2716 related events when they're in a room version
# which does not support it yet. We allow people to use MSC2716
# events in existing room versions but only from the room
# creator since it does not require any changes to the auth
# rules and in effect, the redaction algorithm . In the
# supported room version, we add the `historical` power level to
# auth the MSC2716 related events and adjust the redaction
# algorthim to keep the `historical` field around (redacting an
# event should only strip fields which don't affect the
# structural protocol level).
is_msc2716_event = (
original_event.type == EventTypes.MSC2716_INSERTION
or original_event.type == EventTypes.MSC2716_BATCH
or original_event.type == EventTypes.MSC2716_MARKER
)
if not room_version_obj.msc2716_historical and is_msc2716_event:
raise AuthError(
403,
"Redacting MSC2716 events is not supported in this room version",
)
event_types = event_auth.auth_types_for_event(event.room_version, event)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types(event_types)
@ -1935,58 +1845,12 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
if event.type == EventTypes.MSC2716_INSERTION:
room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
create_event = await self.store.get_create_event_for_room(event.room_id)
if not room_version_obj.msc2175_implicit_room_creator:
room_creator = create_event.content.get(
EventContentFields.ROOM_CREATOR
)
else:
room_creator = create_event.sender
# Only check an insertion event if the room version
# supports it or the event is from the room creator.
if room_version_obj.msc2716_historical or (
self.config.experimental.msc2716_enabled
and event.sender == room_creator
):
next_batch_id = event.content.get(
EventContentFields.MSC2716_NEXT_BATCH_ID
)
conflicting_insertion_event_id = None
if next_batch_id:
conflicting_insertion_event_id = (
await self.store.get_insertion_event_id_by_batch_id(
event.room_id, next_batch_id
)
)
if conflicting_insertion_event_id is not None:
# The current insertion event that we're processing is invalid
# because an insertion event already exists in the room with the
# same next_batch_id. We can't allow multiple because the batch
# pointing will get weird, e.g. we can't determine which insertion
# event the batch event is pointing to.
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Another insertion event already exists with the same next_batch_id",
errcode=Codes.INVALID_PARAM,
)
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
assert self._storage_controllers.persistence is not None
(
persisted_events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(
events_and_context, backfilled=backfilled
events_and_context,
)
events_and_pos = []

View File

@ -1,466 +0,0 @@
import logging
from typing import TYPE_CHECKING, List, Tuple
from synapse.api.constants import EventContentFields, EventTypes
from synapse.appservice import ApplicationService
from synapse.http.servlet import assert_params_in_dict
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self._state_storage_controller = hs.get_storage_controllers().state
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int:
"""Finds the depth which would sort it after the most-recent
prev_event_id but before the successors of those events. If no
successors are found, we assume it's an historical extremity part of the
current batch and use the same depth of the prev_event_ids.
Args:
prev_event_ids: List of prev event IDs
Returns:
Inherited depth
"""
(
most_recent_prev_event_id,
most_recent_prev_event_depth,
) = await self.store.get_max_depth_of(prev_event_ids)
# We want to insert the historical event after the `prev_event` but before the successor event
#
# We inherit depth from the successor event instead of the `prev_event`
# because events returned from `/messages` are first sorted by `topological_ordering`
# which is just the `depth` and then tie-break with `stream_ordering`.
#
# We mark these inserted historical events as "backfilled" which gives them a
# negative `stream_ordering`. If we use the same depth as the `prev_event`,
# then our historical event will tie-break and be sorted before the `prev_event`
# when it should come after.
#
# We want to use the successor event depth so they appear after `prev_event` because
# it has a larger `depth` but before the successor event because the `stream_ordering`
# is negative before the successor event.
assert most_recent_prev_event_id is not None
successor_event_ids = await self.store.get_successor_events(
most_recent_prev_event_id
)
# If we can't find any successor events, then it's a forward extremity of
# historical messages and we can just inherit from the previous historical
# event which we can already assume has the correct depth where we want
# to insert into.
if not successor_event_ids:
depth = most_recent_prev_event_depth
else:
(
_,
oldest_successor_depth,
) = await self.store.get_min_depth_of(successor_event_ids)
depth = oldest_successor_depth
return depth
def create_insertion_event_dict(
self, sender: str, room_id: str, origin_server_ts: int
) -> JsonDict:
"""Creates an event dict for an "insertion" event with the proper fields
and a random batch ID.
Args:
sender: The event author MXID
room_id: The room ID that the event belongs to
origin_server_ts: Timestamp when the event was sent
Returns:
The new event dictionary to insert.
"""
next_batch_id = random_string(8)
insertion_event = {
"type": EventTypes.MSC2716_INSERTION,
"sender": sender,
"room_id": room_id,
"content": {
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
"origin_server_ts": origin_server_ts,
}
return insertion_event
async def create_requester_for_user_id_from_app_service(
self, user_id: str, app_service: ApplicationService
) -> Requester:
"""Creates a new requester for the given user_id
and validates that the app service is allowed to control
the given user.
Args:
user_id: The author MXID that the app service is controlling
app_service: The app service that controls the user
Returns:
Requester object
"""
await self.auth.validate_appservice_can_control_user_id(app_service, user_id)
return create_requester(user_id, app_service=app_service)
async def get_most_recent_full_state_ids_from_event_id_list(
self, event_ids: List[str]
) -> List[str]:
"""Find the most recent event_id and grab the full state at that event.
We will use this as a base to auth our historical messages against.
Args:
event_ids: List of event ID's to look at
Returns:
List of event ID's
"""
(
most_recent_event_id,
_,
) = await self.store.get_max_depth_of(event_ids)
# mapping from (type, state_key) -> state_event_id
assert most_recent_event_id is not None
prev_state_map = await self._state_storage_controller.get_state_ids_for_event(
most_recent_event_id
)
# List of state event ID's
full_state_ids = list(prev_state_map.values())
return full_state_ids
async def persist_state_events_at_start(
self,
state_events_at_start: List[JsonDict],
room_id: str,
initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
"""Takes all `state_events_at_start` event dictionaries and creates/persists
them in a floating state event chain which don't resolve into the current room
state. They are floating because they reference no prev_events which disconnects
them from the normal DAG.
Args:
state_events_at_start:
room_id: Room where you want the events persisted in.
initial_state_event_ids:
The base set of state for the historical batch which the floating
state chain will derive from. This should probably be the state
from the `prev_event` defined by `/batch_send?prev_event_id=$abc`.
app_service_requester: The requester of an application service.
Returns:
List of state event ID's we just persisted
"""
assert app_service_requester.app_service
state_event_ids_at_start = []
state_event_ids = initial_state_event_ids.copy()
# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
# `@mxid joined the room` noise between each batch.
prev_event_ids_for_state_chain: List[str] = []
for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
)
logger.debug(
"RoomBatchSendEventRestServlet inserting state_event=%s", state_event
)
event_dict = {
"type": state_event["type"],
"origin_server_ts": state_event["origin_server_ts"],
"content": state_event["content"],
"room_id": room_id,
"sender": state_event["sender"],
"state_key": state_event["state_key"],
}
# Mark all events as historical
event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
# TODO: This is pretty much the same as some other code to handle inserting state in this file
if event_dict["type"] == EventTypes.Member:
membership = event_dict["content"].get("membership", None)
event_id, _ = await self.room_member_handler.update_membership(
await self.create_requester_for_user_id_from_app_service(
state_event["sender"], app_service_requester.app_service
),
target=UserID.from_string(event_dict["state_key"]),
room_id=room_id,
action=membership,
content=event_dict["content"],
historical=True,
# Only the first event in the state chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# The first event in the state chain is floating with no
# `prev_events` which means it can't derive state from
# anywhere automatically. So we need to set some state
# explicitly.
#
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append
# later.
state_event_ids=state_event_ids.copy(),
)
else:
(
event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
await self.create_requester_for_user_id_from_app_service(
state_event["sender"], app_service_requester.app_service
),
event_dict,
historical=True,
# Only the first event in the state chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# The first event in the state chain is floating with no
# `prev_events` which means it can't derive state from
# anywhere automatically. So we need to set some state
# explicitly.
#
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
state_event_ids=state_event_ids.copy(),
)
event_id = event.event_id
state_event_ids_at_start.append(event_id)
state_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_ids_for_state_chain = [event_id]
return state_event_ids_at_start
async def persist_historical_events(
self,
events_to_create: List[JsonDict],
room_id: str,
inherited_depth: int,
initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
"""Create and persists all events provided sequentially. Handles the
complexity of creating events in chronological order so they can
reference each other by prev_event but still persists in
reverse-chronoloical order so they have the correct
(topological_ordering, stream_ordering) and sort correctly from
/messages.
Args:
events_to_create: List of historical events to create in JSON
dictionary format.
room_id: Room where you want the events persisted in.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
initial_state_event_ids:
This is used to set explicit state for the insertion event at
the start of the historical batch since it's floating with no
prev_events to derive state from automatically.
app_service_requester: The requester of an application service.
Returns:
List of persisted event IDs
"""
assert app_service_requester.app_service
# We expect the first event in a historical batch to be an insertion event
assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
# We expect the last event in a historical batch to be an batch event
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []
event_ids = []
events_to_persist = []
for index, ev in enumerate(events_to_create):
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
ev["sender"],
)
event_dict = {
"type": ev["type"],
"origin_server_ts": ev["origin_server_ts"],
"content": ev["content"],
"room_id": room_id,
"sender": ev["sender"], # requester.user.to_string(),
"prev_events": prev_event_ids.copy(),
}
# Mark all events as historical
event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
event, unpersisted_context = await self.event_creation_handler.create_event(
await self.create_requester_for_user_id_from_app_service(
ev["sender"], app_service_requester.app_service
),
event_dict,
# Only the first event (which is the insertion event) in the
# chain should be floating. The rest should hang off each other
# in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
# Since the first event (which is the insertion event) in the
# chain is floating with no `prev_events`, it can't derive state
# from anywhere automatically. So we need to set some state
# explicitly.
state_event_ids=initial_state_event_ids if index == 0 else None,
historical=True,
depth=inherited_depth,
)
context = await unpersisted_context.persist(event)
assert context._state_group
# Normally this is done when persisting the event but we have to
# pre-emptively do it here because we create all the events first,
# then persist them in another pass below. And we want to share
# state_groups across the whole batch so this lookup needs to work
# for the next event in the batch in this loop.
await self.store.store_state_group_id_for_event_id(
event_id=event.event_id,
state_group_id=context._state_group,
)
logger.debug(
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s",
event,
prev_event_ids,
)
events_to_persist.append((event, context))
event_id = event.event_id
event_ids.append(event_id)
prev_event_ids = [event_id]
# Persist events in reverse-chronological order so they have the
# correct stream_ordering as they are backfilled (which decrements).
# Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth.
for event, context in reversed(events_to_persist):
# This call can't raise `PartialStateConflictError` since we forbid
# use of the historical batch API during partial state
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event.sender, app_service_requester.app_service
),
events_and_context=[(event, context)],
)
return event_ids
async def handle_batch_of_events(
self,
events_to_create: List[JsonDict],
room_id: str,
batch_id_to_connect_to: str,
inherited_depth: int,
initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> Tuple[List[str], str]:
"""
Handles creating and persisting all of the historical events as well as
insertion and batch meta events to make the batch navigable in the DAG.
Args:
events_to_create: List of historical events to create in JSON
dictionary format.
room_id: Room where you want the events created in.
batch_id_to_connect_to: The batch_id from the insertion event you
want this batch to connect to.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
initial_state_event_ids:
This is used to set explicit state for the insertion event at
the start of the historical batch since it's floating with no
prev_events to derive state from automatically. This should
probably be the state from the `prev_event` defined by
`/batch_send?prev_event_id=$abc` plus the outcome of
`persist_state_events_at_start`
app_service_requester: The requester of an application service.
Returns:
Tuple containing a list of created events and the next_batch_id
"""
# Connect this current batch to the insertion event from the previous batch
last_event_in_batch = events_to_create[-1]
batch_event = {
"type": EventTypes.MSC2716_BATCH,
"sender": app_service_requester.user.to_string(),
"room_id": room_id,
"content": {
EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
EventContentFields.MSC2716_HISTORICAL: True,
},
# Since the batch event is put at the end of the batch,
# where the newest-in-time event is, copy the origin_server_ts from
# the last event we're inserting
"origin_server_ts": last_event_in_batch["origin_server_ts"],
}
# Add the batch event to the end of the batch (newest-in-time)
events_to_create.append(batch_event)
# Add an "insertion" event to the start of each batch (next to the oldest-in-time
# event in the batch) so the next batch can be connected to this one.
insertion_event = self.create_insertion_event_dict(
sender=app_service_requester.user.to_string(),
room_id=room_id,
# Since the insertion event is put at the start of the batch,
# where the oldest-in-time event is, copy the origin_server_ts from
# the first event we're inserting
origin_server_ts=events_to_create[0]["origin_server_ts"],
)
next_batch_id = insertion_event["content"][
EventContentFields.MSC2716_NEXT_BATCH_ID
]
# Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create
# Create and persist all of the historical events
event_ids = await self.persist_historical_events(
events_to_create=events_to_create,
room_id=room_id,
inherited_depth=inherited_depth,
initial_state_event_ids=initial_state_event_ids,
app_service_requester=app_service_requester,
)
return event_ids, next_batch_id

View File

@ -362,7 +362,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content: Optional[dict] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
origin_server_ts: Optional[int] = None,
) -> Tuple[str, int]:
"""
@ -378,16 +377,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
cases (previously useful for MSC2716).
prev_event_ids: The event IDs to use as the prev events
state_event_ids:
The full state at a given event. This is used particularly by the MSC2716
/batch_send endpoint. One use case is the historical `state_events_at_start`;
since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
have any `state_ids` set and therefore can't derive any state even though the
prev_events are set so we need to set them ourself via this argument.
This should normally be left as None, which will cause the auth_event_ids
to be calculated based on the room state at the prev_events.
The full state at a given event. This was previously used particularly
by the MSC2716 /batch_send endpoint. This should normally be left as
None, which will cause the auth_event_ids to be calculated based on the
room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@ -400,9 +396,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
origin_server_ts: The origin_server_ts to use if a new event is created. Uses
the current timestamp if set to None.
@ -477,7 +470,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
depth=depth,
require_consent=require_consent,
outlier=outlier,
historical=historical,
)
context = await unpersisted_context.persist(event)
prev_state_ids = await context.get_prev_state_ids(
@ -585,7 +577,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
@ -610,22 +601,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
cases (previously useful for MSC2716).
prev_event_ids: The event IDs to use as the prev events
state_event_ids:
The full state at a given event. This is used particularly by the MSC2716
/batch_send endpoint. One use case is the historical `state_events_at_start`;
since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
have any `state_ids` set and therefore can't derive any state even though the
prev_events are set so we need to set them ourself via this argument.
This should normally be left as None, which will cause the auth_event_ids
to be calculated based on the room state at the prev_events.
The full state at a given event. This was previously used particularly
by the MSC2716 /batch_send endpoint. This should normally be left as
None, which will cause the auth_event_ids to be calculated based on the
room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@ -667,7 +652,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
historical=historical,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
@ -691,7 +675,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
@ -718,22 +701,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
cases (previously useful for MSC2716).
prev_event_ids: The event IDs to use as the prev events
state_event_ids:
The full state at a given event. This is used particularly by the MSC2716
/batch_send endpoint. One use case is the historical `state_events_at_start`;
since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
have any `state_ids` set and therefore can't derive any state even though the
prev_events are set so we need to set them ourself via this argument.
This should normally be left as None, which will cause the auth_event_ids
to be calculated based on the room state at the prev_events.
The full state at a given event. This was previously used particularly
by the MSC2716 /batch_send endpoint. This should normally be left as
None, which will cause the auth_event_ids to be calculated based on the
room state at the prev_events.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@ -877,7 +854,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content=content,
require_consent=require_consent,
outlier=outlier,
historical=historical,
origin_server_ts=origin_server_ts,
)

View File

@ -322,7 +322,6 @@ class BulkPushRuleEvaluator:
) -> None:
if (
not event.internal_metadata.is_notifiable()
or event.internal_metadata.is_historical()
or event.room_id in self.hs.config.server.rooms_to_exclude_from_sync
):
# Push rules for events that aren't notifiable can't be processed by this and

View File

@ -48,7 +48,6 @@ from synapse.rest.client import (
rendezvous,
report_event,
room,
room_batch,
room_keys,
room_upgrade_rest_servlet,
sendtodevice,
@ -132,7 +131,6 @@ class ClientRestResource(JsonResource):
user_directory.register_servlets(hs, client_resource)
if is_main_process:
room_upgrade_rest_servlet.register_servlets(hs, client_resource)
room_batch.register_servlets(hs, client_resource)
capabilities.register_servlets(hs, client_resource)
if is_main_process:
account_validity.register_servlets(hs, client_resource)

View File

@ -1,254 +0,0 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple
from synapse.api.constants import EventContentFields
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
parse_string,
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class RoomBatchSendEventRestServlet(RestServlet):
"""
API endpoint which can insert a batch of events historically back in time
next to the given `prev_event`.
`batch_id` comes from `next_batch_id `in the response of the batch send
endpoint and is derived from the "insertion" events added to each batch.
It's not required for the first batch send.
`state_events_at_start` is used to define the historical state events
needed to auth the events like join events. These events will float
outside of the normal DAG as outlier's and won't be visible in the chat
history which also allows us to insert multiple batches without having a bunch
of `@mxid joined the room` noise between each batch.
`events` is chronological list of events you want to insert.
There is a reverse-chronological constraint on batches so once you insert
some messages, you can only insert older ones after that.
tldr; Insert batches from your most recent history -> oldest history.
POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID>
{
"events": [ ... ],
"state_events_at_start": [ ... ]
}
"""
PATTERNS = (
re.compile(
"^/_matrix/client/unstable/org.matrix.msc2716"
"/rooms/(?P<room_id>[^/]*)/batch_send$"
),
)
CATEGORY = "Client API requests"
def __init__(self, hs: "HomeServer"):
super().__init__()
self.store = hs.get_datastores().main
self.event_creation_handler = hs.get_event_creation_handler()
self.auth = hs.get_auth()
self.room_batch_handler = hs.get_room_batch_handler()
async def on_POST(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=False)
if not requester.app_service:
raise AuthError(
HTTPStatus.FORBIDDEN,
"Only application services can use the /batchsend endpoint",
)
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ["state_events_at_start", "events"])
assert request.args is not None
prev_event_ids_from_query = parse_strings_from_args(
request.args, "prev_event_id"
)
batch_id_from_query = parse_string(request, "batch_id")
if prev_event_ids_from_query is None:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"prev_event query parameter is required when inserting historical messages back in time",
errcode=Codes.MISSING_PARAM,
)
if await self.store.is_partial_state_room(room_id):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Cannot insert history batches until we have fully joined the room",
errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE,
)
# Verify the batch_id_from_query corresponds to an actual insertion event
# and have the batch connected.
if batch_id_from_query:
corresponding_insertion_event_id = (
await self.store.get_insertion_event_id_by_batch_id(
room_id, batch_id_from_query
)
)
if corresponding_insertion_event_id is None:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"No insertion event corresponds to the given ?batch_id",
errcode=Codes.INVALID_PARAM,
)
# Make sure that the prev_event_ids exist and aren't outliers - ie, they are
# regular parts of the room DAG where we know the state.
non_outlier_prev_events = await self.store.have_events_in_timeline(
prev_event_ids_from_query
)
for prev_event_id in prev_event_ids_from_query:
if prev_event_id not in non_outlier_prev_events:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"prev_event %s does not exist, or is an outlier" % (prev_event_id,),
errcode=Codes.INVALID_PARAM,
)
# For the event we are inserting next to (`prev_event_ids_from_query`),
# find the most recent state events that allowed that message to be
# sent. We will use that as a base to auth our historical messages
# against.
state_event_ids = await self.room_batch_handler.get_most_recent_full_state_ids_from_event_id_list(
prev_event_ids_from_query
)
state_event_ids_at_start = []
# Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member
# state events used to auth the upcoming historical messages.
if body["state_events_at_start"]:
state_event_ids_at_start = (
await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
room_id=room_id,
initial_state_event_ids=state_event_ids,
app_service_requester=requester,
)
)
# Update our ongoing auth event ID list with all of the new state we
# just created
state_event_ids.extend(state_event_ids_at_start)
inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
prev_event_ids_from_query
)
events_to_create = body["events"]
# Figure out which batch to connect to. If they passed in
# batch_id_from_query let's use it. The batch ID passed in comes
# from the batch_id in the "insertion" event from the previous batch.
last_event_in_batch = events_to_create[-1]
base_insertion_event = None
if batch_id_from_query:
batch_id_to_connect_to = batch_id_from_query
# Otherwise, create an insertion event to act as a starting point.
#
# We don't always have an insertion event to start hanging more history
# off of (ideally there would be one in the main DAG, but that's not the
# case if we're wanting to add history to e.g. existing rooms without
# an insertion event), in which case we just create a new insertion event
# that can then get pointed to by a "marker" event later.
else:
base_insertion_event_dict = (
self.room_batch_handler.create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
origin_server_ts=last_event_in_batch["origin_server_ts"],
)
)
base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy()
(
base_insertion_event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
await self.room_batch_handler.create_requester_for_user_id_from_app_service(
base_insertion_event_dict["sender"],
requester.app_service,
),
base_insertion_event_dict,
prev_event_ids=base_insertion_event_dict.get("prev_events"),
# Also set the explicit state here because we want to resolve
# any `state_events_at_start` here too. It's not strictly
# necessary to accomplish anything but if someone asks for the
# state at this point, we probably want to show them the
# historical state that was part of this batch.
state_event_ids=state_event_ids,
historical=True,
depth=inherited_depth,
)
batch_id_to_connect_to = base_insertion_event.content[
EventContentFields.MSC2716_NEXT_BATCH_ID
]
# Create and persist all of the historical events as well as insertion
# and batch meta events to make the batch navigable in the DAG.
event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
events_to_create=events_to_create,
room_id=room_id,
batch_id_to_connect_to=batch_id_to_connect_to,
inherited_depth=inherited_depth,
initial_state_event_ids=state_event_ids,
app_service_requester=requester,
)
insertion_event_id = event_ids[0]
batch_event_id = event_ids[-1]
historical_event_ids = event_ids[1:-1]
response_dict = {
"state_event_ids": state_event_ids_at_start,
"event_ids": historical_event_ids,
"next_batch_id": next_batch_id,
"insertion_event_id": insertion_event_id,
"batch_event_id": batch_event_id,
}
if base_insertion_event is not None:
response_dict["base_insertion_event_id"] = base_insertion_event.event_id
return HTTPStatus.OK, response_dict
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
msc2716_enabled = hs.config.experimental.msc2716_enabled
if msc2716_enabled:
RoomBatchSendEventRestServlet(hs).register(http_server)

View File

@ -102,8 +102,6 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc2285.stable": True, # TODO: Remove when MSC2285 becomes a part of the spec
# Supports filtering of /publicRooms by room type as per MSC3827
"org.matrix.msc3827.stable": True,
# Adds support for importing historical messages as per MSC2716
"org.matrix.msc2716": self.config.experimental.msc2716_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
# Support for thread read receipts & notification counts.

View File

@ -91,7 +91,6 @@ from synapse.handlers.room import (
RoomShutdownHandler,
TimestampLookupHandler,
)
from synapse.handlers.room_batch import RoomBatchHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import (
RoomForgetterHandler,
@ -492,10 +491,6 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_room_creation_handler(self) -> RoomCreationHandler:
return RoomCreationHandler(self)
@cache_in_self
def get_room_batch_handler(self) -> RoomBatchHandler:
return RoomBatchHandler(self)
@cache_in_self
def get_room_shutdown_handler(self) -> RoomShutdownHandler:
return RoomShutdownHandler(self)

View File

@ -61,7 +61,6 @@ from .registration import RegistrationStore
from .rejections import RejectionsStore
from .relations import RelationsStore
from .room import RoomStore
from .room_batch import RoomBatchStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .session import SessionStore
@ -87,7 +86,6 @@ class DataStore(
DeviceStore,
RoomMemberStore,
RoomStore,
RoomBatchStore,
RegistrationStore,
ProfileStore,
PresenceStore,

View File

@ -31,7 +31,7 @@ from typing import (
import attr
from prometheus_client import Counter, Gauge
from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id,
)
@trace
async def get_insertion_event_backward_extremities_in_room(
self,
room_id: str,
current_depth: int,
limit: int,
) -> List[Tuple[str, int]]:
"""
Get the insertion events we know about that we haven't backfilled yet
along with the approximate depth. Only returns insertion events that are
at a depth lower than or equal to the `current_depth`. Sorted by depth,
highest to lowest (descending) so the closest events to the
`current_depth` are first in the list.
We ignore insertion events that are newer than the user's current scroll
position (ie, those with depth greater than `current_depth`) as:
1. we don't really care about getting events that have happened
after our current position; and
2. by the nature of paginating and scrolling back, we have likely
previously tried and failed to backfill from that insertion event, so
to avoid getting "stuck" requesting the same backfill repeatedly
we drop those insertion event.
Args:
room_id: Room where we want to find the oldest events
current_depth: The depth at the user's current scrollback position
limit: The max number of insertion event extremities to return
Returns:
List of (event_id, depth) tuples. Sorted by depth, highest to lowest
(descending) so the closest events to the `current_depth` are first
in the list.
"""
def get_insertion_event_backward_extremities_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
if isinstance(self.database_engine, PostgresEngine):
least_function = "LEAST"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "MIN"
else:
raise RuntimeError("Unknown database engine")
sql = f"""
SELECT
insertion_event_extremity.event_id, event.depth
/* We only want insertion events that are also marked as backwards extremities */
FROM insertion_event_extremities AS insertion_event_extremity
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS event USING (event_id)
/**
* We use this info to make sure we don't retry to use a backfill point
* if we've already attempted to backfill from it recently.
*/
LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
ON
failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
WHERE
insertion_event_extremity.room_id = ?
/**
* We only want extremities that are older than or at
* the same position of the given `current_depth` (where older
* means less than the given depth) because we're looking backwards
* from the `current_depth` when backfilling.
*
* current_depth (ignore events that come after this, ignore 2-4)
* |
*
* <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
*/
AND event.depth <= ? /* current_depth */
/**
* Exponential back-off (up to the upper bound) so we don't retry the
* same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
*
* We use `1 << n` as a power of 2 equivalent for compatibility
* with older SQLites. The left shift equivalent only works with
* powers of 2 because left shift is a binary operation (base-2).
* Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
(1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
* ? /* step */
)
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
* because the closest are most relevant to backfill from first.
* Then tie-break on alphabetical order of the event_ids so we get a
* consistent ordering which is nice when asserting things in tests.
*/
ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
LIMIT ?
"""
txn.execute(
sql,
(
room_id,
current_depth,
self._clock.time_msec(),
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
limit,
),
)
return cast(List[Tuple[str, int]], txn.fetchall())
return await self.db_pool.runInteraction(
"get_insertion_event_backward_extremities_in_room",
get_insertion_event_backward_extremities_in_room_txn,
room_id,
)
async def get_max_depth_of(
self, event_ids: Collection[str]
) -> Tuple[Optional[str], int]:
@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return event_ids
def _get_connected_batch_event_backfill_results_txn(
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
"""
Find any batch connections of a given insertion event.
A batch event points at a insertion event via:
batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
Args:
txn: The database transaction to use
insertion_event_id: The event ID to navigate from. We will find
batch events that point back at this insertion event.
limit: Max number of event ID's to query for and return
Returns:
List of batch events that the backfill queue can process
"""
batch_connection_query = """
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
/* Find the batch that connects to the given insertion event */
INNER JOIN batch_events AS c
ON i.next_batch_id = c.batch_id
/* Get the depth of the batch start event from the events table */
INNER JOIN events AS e ON c.event_id = e.event_id
/* Find an insertion event which matches the given event_id */
WHERE i.event_id = ?
LIMIT ?
"""
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
(insertion_event_id, limit),
)
return [
BackfillQueueNavigationItem(
depth=row[0],
stream_ordering=row[1],
event_id=row[2],
type=row[3],
)
for row in txn
]
def _get_connected_prev_event_backfill_results_txn(
self, txn: LoggingTransaction, event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_id_results.add(event_id)
# Try and find any potential historical batches of message history.
if self.hs.config.experimental.msc2716_enabled:
# We need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
# next iteration of the loop.
if event_type == EventTypes.MSC2716_INSERTION:
# Find any batch connections for the given insertion event
connected_batch_event_backfill_results = (
self._get_connected_batch_event_backfill_results_txn(
txn, event_id, limit - len(event_id_results)
)
)
logger.debug(
"_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
room_id,
connected_batch_event_backfill_results,
)
for (
connected_batch_event_backfill_item
) in connected_batch_event_backfill_results:
if (
connected_batch_event_backfill_item.event_id
not in event_id_results
):
queue.put(
(
-connected_batch_event_backfill_item.depth,
-connected_batch_event_backfill_item.stream_ordering,
connected_batch_event_backfill_item.event_id,
connected_batch_event_backfill_item.type,
)
)
# Now we just look up the DAG by prev_events as normal
connected_prev_event_backfill_results = (
self._get_connected_prev_event_backfill_results_txn(
@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
@trace
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",
keyvalues={"event_id": event_id},
values={
"event_id": event_id,
"room_id": room_id,
},
insertion_values={},
desc="insert_insertion_extremity",
)
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:

View File

@ -1664,9 +1664,6 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
self._handle_insertion_event(txn, event)
self._handle_batch_event(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@ -1927,128 +1924,6 @@ class PersistEventsStore:
),
)
def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
"""Handles keeping track of insertion events and edges/connections.
Part of MSC2716.
Args:
txn: The database transaction object
event: The event to process
"""
if event.type != EventTypes.MSC2716_INSERTION:
# Not a insertion event
return
# Skip processing an insertion event if the room version doesn't
# support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
room_creator = self.db_pool.simple_select_one_onecol_txn(
txn,
table="rooms",
keyvalues={"room_id": event.room_id},
retcol="creator",
allow_none=True,
)
if not room_version.msc2716_historical and (
not self.hs.config.experimental.msc2716_enabled
or event.sender != room_creator
):
return
next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
if next_batch_id is None:
# Invalid insertion event without next batch ID
return
logger.debug(
"_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
)
# Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"next_batch_id": next_batch_id,
},
)
# Insert an edge for every prev_event connection
for prev_event_id in event.prev_event_ids():
self.db_pool.simple_insert_txn(
txn,
table="insertion_event_edges",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"insertion_prev_event_id": prev_event_id,
},
)
def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Handles inserting the batch edges/connections between the batch event
and an insertion event. Part of MSC2716.
Args:
txn: The database transaction object
event: The event to process
"""
if event.type != EventTypes.MSC2716_BATCH:
# Not a batch event
return
# Skip processing a batch event if the room version doesn't
# support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
room_creator = self.db_pool.simple_select_one_onecol_txn(
txn,
table="rooms",
keyvalues={"room_id": event.room_id},
retcol="creator",
allow_none=True,
)
if not room_version.msc2716_historical and (
not self.hs.config.experimental.msc2716_enabled
or event.sender != room_creator
):
return
batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
if batch_id is None:
# Invalid batch event without a batch ID
return
logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
# Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="batch_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"batch_id": batch_id,
},
)
# When we receive an event with a `batch_id` referencing the
# `next_batch_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
WHERE next_batch_id = ?
)
"""
txn.execute(sql, (batch_id,))
def _handle_redact_relations(
self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:

View File

@ -1,47 +0,0 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from synapse.storage._base import SQLBaseStore
class RoomBatchStore(SQLBaseStore):
async def get_insertion_event_id_by_batch_id(
self, room_id: str, batch_id: str
) -> Optional[str]:
"""Retrieve a insertion event ID.
Args:
batch_id: The batch ID of the insertion event to retrieve.
Returns:
The event_id of an insertion event, or None if there is no known
insertion event for the given insertion event.
"""
return await self.db_pool.simple_select_one_onecol(
table="insertion_events",
keyvalues={"room_id": room_id, "next_batch_id": batch_id},
retcol="event_id",
allow_none=True,
)
async def store_state_group_id_for_event_id(
self, event_id: str, state_group_id: int
) -> None:
await self.db_pool.simple_upsert(
table="event_to_state_groups",
keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id},
)

View File

@ -1,302 +0,0 @@
import logging
from typing import List, Tuple
from unittest.mock import Mock, patch
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventContentFields, EventTypes
from synapse.appservice import ApplicationService
from synapse.rest import admin
from synapse.rest.client import login, register, room, room_batch, sync
from synapse.server import HomeServer
from synapse.types import JsonDict, RoomStreamToken
from synapse.util import Clock
from tests import unittest
logger = logging.getLogger(__name__)
def _create_join_state_events_for_batch_send_request(
virtual_user_ids: List[str],
insert_time: int,
) -> List[JsonDict]:
return [
{
"type": EventTypes.Member,
"sender": virtual_user_id,
"origin_server_ts": insert_time,
"content": {
"membership": "join",
"displayname": "display-name-for-%s" % (virtual_user_id,),
},
"state_key": virtual_user_id,
}
for virtual_user_id in virtual_user_ids
]
def _create_message_events_for_batch_send_request(
virtual_user_id: str, insert_time: int, count: int
) -> List[JsonDict]:
return [
{
"type": EventTypes.Message,
"sender": virtual_user_id,
"origin_server_ts": insert_time,
"content": {
"msgtype": "m.text",
"body": "Historical %d" % (i),
EventContentFields.MSC2716_HISTORICAL: True,
},
}
for i in range(count)
]
class RoomBatchTestCase(unittest.HomeserverTestCase):
"""Test importing batches of historical messages."""
servlets = [
admin.register_servlets_for_client_rest_resource,
room_batch.register_servlets,
room.register_servlets,
register.register_servlets,
login.register_servlets,
sync.register_servlets,
]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
self.appservice = ApplicationService(
token="i_am_an_app_service",
id="1234",
namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]},
# Note: this user does not have to match the regex above
sender="@as_main:test",
)
mock_load_appservices = Mock(return_value=[self.appservice])
with patch(
"synapse.storage.databases.main.appservice.load_appservices",
mock_load_appservices,
):
hs = self.setup_test_homeserver(config=config)
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.clock = clock
self._storage_controllers = hs.get_storage_controllers()
self.virtual_user_id, _ = self.register_appservice_user(
"as_user_potato", self.appservice.token
)
def _create_test_room(self) -> Tuple[str, str, str, str]:
room_id = self.helper.create_room_as(
self.appservice.sender, tok=self.appservice.token
)
res_a = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "A",
},
tok=self.appservice.token,
)
event_id_a = res_a["event_id"]
res_b = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "B",
},
tok=self.appservice.token,
)
event_id_b = res_b["event_id"]
res_c = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "C",
},
tok=self.appservice.token,
)
event_id_c = res_c["event_id"]
return room_id, event_id_a, event_id_b, event_id_c
@unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
def test_same_state_groups_for_whole_historical_batch(self) -> None:
"""Make sure that when using the `/batch_send` endpoint to import a
bunch of historical messages, it re-uses the same `state_group` across
the whole batch. This is an easy optimization to make sure we're getting
right because the state for the whole batch is contained in
`state_events_at_start` and can be shared across everything.
"""
time_before_room = int(self.clock.time_msec())
room_id, event_id_a, _, _ = self._create_test_room()
channel = self.make_request(
"POST",
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
% (room_id, event_id_a),
content={
"events": _create_message_events_for_batch_send_request(
self.virtual_user_id, time_before_room, 3
),
"state_events_at_start": _create_join_state_events_for_batch_send_request(
[self.virtual_user_id], time_before_room
),
},
access_token=self.appservice.token,
)
self.assertEqual(channel.code, 200, channel.result)
# Get the historical event IDs that we just imported
historical_event_ids = channel.json_body["event_ids"]
self.assertEqual(len(historical_event_ids), 3)
# Fetch the state_groups
state_group_map = self.get_success(
self._storage_controllers.state.get_state_groups_ids(
room_id, historical_event_ids
)
)
# We expect all of the historical events to be using the same state_group
# so there should only be a single state_group here!
self.assertEqual(
len(state_group_map.keys()),
1,
"Expected a single state_group to be returned by saw state_groups=%s"
% (state_group_map.keys(),),
)
@unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
def test_sync_while_batch_importing(self) -> None:
"""
Make sure that /sync correctly returns full room state when a user joins
during ongoing batch backfilling.
See: https://github.com/matrix-org/synapse/issues/12281
"""
# Create user who will be invited & join room
user_id = self.register_user("beep", "test")
user_tok = self.login("beep", "test")
time_before_room = int(self.clock.time_msec())
# Create a room with some events
room_id, _, _, _ = self._create_test_room()
# Invite the user
self.helper.invite(
room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
)
# Create another room, send a bunch of events to advance the stream token
other_room_id = self.helper.create_room_as(
self.appservice.sender, tok=self.appservice.token
)
for _ in range(5):
self.helper.send_event(
room_id=other_room_id,
type=EventTypes.Message,
content={"msgtype": "m.text", "body": "C"},
tok=self.appservice.token,
)
# Join the room as the normal user
self.helper.join(room_id, user_id, tok=user_tok)
# Create an event to hang the historical batch from - In order to see
# the failure case originally reported in #12281, the historical batch
# must be hung from the most recent event in the room so the base
# insertion event ends up with the highest `topogological_ordering`
# (`depth`) in the room but will have a negative `stream_ordering`
# because it's a `historical` event. Previously, when assembling the
# `state` for the `/sync` response, the bugged logic would sort by
# `topological_ordering` descending and pick up the base insertion
# event because it has a negative `stream_ordering` below the given
# pagination token. Now we properly sort by `stream_ordering`
# descending which puts `historical` events with a negative
# `stream_ordering` way at the bottom and aren't selected as expected.
response = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "C",
},
tok=self.appservice.token,
)
event_to_hang_id = response["event_id"]
channel = self.make_request(
"POST",
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
% (room_id, event_to_hang_id),
content={
"events": _create_message_events_for_batch_send_request(
self.virtual_user_id, time_before_room, 3
),
"state_events_at_start": _create_join_state_events_for_batch_send_request(
[self.virtual_user_id], time_before_room
),
},
access_token=self.appservice.token,
)
self.assertEqual(channel.code, 200, channel.result)
# Now we need to find the invite + join events stream tokens so we can sync between
main_store = self.hs.get_datastores().main
events, next_key = self.get_success(
main_store.get_recent_events_for_room(
room_id,
50,
end_token=main_store.get_room_max_token(),
),
)
invite_event_position = None
for event in events:
if (
event.type == "m.room.member"
and event.content["membership"] == "invite"
):
invite_event_position = self.get_success(
main_store.get_topological_token_for_event(event.event_id)
)
break
assert invite_event_position is not None, "No invite event found"
# Remove the topological order from the token by re-creating w/stream only
invite_event_position = RoomStreamToken(None, invite_event_position.stream)
# Sync everything after this token
since_token = self.get_success(invite_event_position.to_string(main_store))
sync_response = self.make_request(
"GET",
f"/sync?since={since_token}",
access_token=user_tok,
)
# Assert that, for this room, the user was considered to have joined and thus
# receives the full state history
state_event_types = [
event["type"]
for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
"events"
]
]
assert (
"m.room.create" in state_event_types
), "Missing room full state in sync response"

View File

@ -20,7 +20,6 @@ from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
@ -924,216 +923,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])
def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
"""
Sets up a room with various insertion event backward extremities to test
backfill functions against.
Returns:
_BackfillSetupInfo including the `room_id` to test against and
`depth_map` of events in the room
"""
room_id = "!backfill-room-test:some-host"
depth_map: Dict[str, int] = {
"1": 1,
"2": 2,
"insertion_eventA": 3,
"3": 4,
"insertion_eventB": 5,
"4": 6,
"5": 7,
}
def populate_db(txn: LoggingTransaction) -> None:
# Insert the room to satisfy the foreign key constraint of
# `event_failed_pull_attempts`
self.store.db_pool.simple_insert_txn(
txn,
"rooms",
{
"room_id": room_id,
"creator": "room_creator_user_id",
"is_public": True,
"room_version": "6",
},
)
# Insert our server events
stream_ordering = 0
for event_id, depth in depth_map.items():
self.store.db_pool.simple_insert_txn(
txn,
table="events",
values={
"event_id": event_id,
"type": EventTypes.MSC2716_INSERTION
if event_id.startswith("insertion_event")
else "test_regular_type",
"room_id": room_id,
"depth": depth,
"topological_ordering": depth,
"stream_ordering": stream_ordering,
"processed": True,
"outlier": False,
},
)
if event_id.startswith("insertion_event"):
self.store.db_pool.simple_insert_txn(
txn,
table="insertion_event_extremities",
values={
"event_id": event_id,
"room_id": room_id,
},
)
stream_ordering += 1
self.get_success(
self.store.db_pool.runInteraction(
"_setup_room_for_insertion_backfill_tests_populate_db",
populate_db,
)
)
return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
def test_get_insertion_event_backward_extremities_in_room(self) -> None:
"""
Test to make sure only insertion event backward extremities that are
older and come before the `current_depth` are returned.
"""
setup_info = self._setup_room_for_insertion_backfill_tests()
room_id = setup_info.room_id
depth_map = setup_info.depth_map
# Try at "insertion_eventB"
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(
room_id, depth_map["insertion_eventB"], limit=100
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventB", "insertion_eventA"])
# Try at "insertion_eventA"
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(
room_id, depth_map["insertion_eventA"], limit=100
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
# Event "2" has a depth of 2 but is not included here because we only
# know the approximate depth of 5 from our event "3".
self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted(
self,
) -> None:
"""
Test to make sure that insertion events we have attempted to backfill
(and within backoff timeout duration) do not show up as an event to
backfill again.
"""
setup_info = self._setup_room_for_insertion_backfill_tests()
room_id = setup_info.room_id
depth_map = setup_info.depth_map
# Record some attempts to backfill these events which will make
# `get_insertion_event_backward_extremities_in_room` exclude them
# because we haven't passed the backoff interval.
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
# No time has passed since we attempted to backfill ^
# Try at "insertion_eventB"
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(
room_id, depth_map["insertion_eventB"], limit=100
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
# Only the backfill points that we didn't record earlier exist here.
self.assertEqual(backfill_event_ids, ["insertion_eventB"])
def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
self,
) -> None:
"""
Test to make sure after we fake attempt to backfill event
"insertion_eventA" many times, we can see retry and see the
"insertion_eventA" again after the backoff timeout duration has
exceeded.
"""
setup_info = self._setup_room_for_insertion_backfill_tests()
room_id = setup_info.room_id
depth_map = setup_info.depth_map
# Record some attempts to backfill these events which will make
# `get_backfill_points_in_room` exclude them because we
# haven't passed the backoff interval.
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventB", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
# Now advance time by 2 hours and we should only be able to see
# "insertion_eventB" because we have waited long enough for the single
# attempt (2^1 hours) but we still shouldn't see "insertion_eventA"
# because we haven't waited long enough for this many attempts.
self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
# Try at "insertion_eventA" and make sure that "insertion_eventA" is not
# in the list because we've already attempted many times
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(
room_id, depth_map["insertion_eventA"], limit=100
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, [])
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
# see if we can now backfill it
self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
# Try at "insertion_eventA" again after we advanced enough time and we
# should see "insertion_eventA" again
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(
room_id, depth_map["insertion_eventA"], limit=100
)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventA"])
def test_get_event_ids_with_failed_pull_attempts(self) -> None:
"""
Test to make sure we properly get event_ids based on whether they have any