mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-11-03 21:57:26 +00:00 
			
		
		
		
	Add event_stream_ordering column to membership state tables (#14979)
				
					
				
			This adds an `event_stream_ordering` column to `current_state_events`, `local_current_membership` and `room_memberships`. Each of these tables is regularly joined with the `events` table to get the stream ordering and denormalising this into each table will yield significant query performance improvements once used. Includes a background job to populate these values from the `events` table. Same idea as https://github.com/matrix-org/synapse/pull/13703. Signed off by Nick @ Beeper (@fizzadar).
This commit is contained in:
		
							parent
							
								
									64a631879c
								
							
						
					
					
						commit
						5fdc12f482
					
				
							
								
								
									
										1
									
								
								changelog.d/14979.misc
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								changelog.d/14979.misc
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1 @@
 | 
			
		||||
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).
 | 
			
		||||
@ -1147,11 +1147,15 @@ class PersistEventsStore:
 | 
			
		||||
                # been inserted into room_memberships.
 | 
			
		||||
                txn.execute_batch(
 | 
			
		||||
                    """INSERT INTO current_state_events
 | 
			
		||||
                        (room_id, type, state_key, event_id, membership)
 | 
			
		||||
                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
 | 
			
		||||
                        (room_id, type, state_key, event_id, membership, event_stream_ordering)
 | 
			
		||||
                    VALUES (
 | 
			
		||||
                        ?, ?, ?, ?,
 | 
			
		||||
                        (SELECT membership FROM room_memberships WHERE event_id = ?),
 | 
			
		||||
                        (SELECT stream_ordering FROM events WHERE event_id = ?)
 | 
			
		||||
                    )
 | 
			
		||||
                    """,
 | 
			
		||||
                    [
 | 
			
		||||
                        (room_id, key[0], key[1], ev_id, ev_id)
 | 
			
		||||
                        (room_id, key[0], key[1], ev_id, ev_id, ev_id)
 | 
			
		||||
                        for key, ev_id in to_insert.items()
 | 
			
		||||
                    ],
 | 
			
		||||
                )
 | 
			
		||||
@ -1178,11 +1182,15 @@ class PersistEventsStore:
 | 
			
		||||
            if to_insert:
 | 
			
		||||
                txn.execute_batch(
 | 
			
		||||
                    """INSERT INTO local_current_membership
 | 
			
		||||
                        (room_id, user_id, event_id, membership)
 | 
			
		||||
                    VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
 | 
			
		||||
                        (room_id, user_id, event_id, membership, event_stream_ordering)
 | 
			
		||||
                    VALUES (
 | 
			
		||||
                        ?, ?, ?,
 | 
			
		||||
                        (SELECT membership FROM room_memberships WHERE event_id = ?),
 | 
			
		||||
                        (SELECT stream_ordering FROM events WHERE event_id = ?)
 | 
			
		||||
                    )
 | 
			
		||||
                    """,
 | 
			
		||||
                    [
 | 
			
		||||
                        (room_id, key[1], ev_id, ev_id)
 | 
			
		||||
                        (room_id, key[1], ev_id, ev_id, ev_id)
 | 
			
		||||
                        for key, ev_id in to_insert.items()
 | 
			
		||||
                        if key[0] == EventTypes.Member and self.is_mine_id(key[1])
 | 
			
		||||
                    ],
 | 
			
		||||
@ -1790,6 +1798,7 @@ class PersistEventsStore:
 | 
			
		||||
            table="room_memberships",
 | 
			
		||||
            keys=(
 | 
			
		||||
                "event_id",
 | 
			
		||||
                "event_stream_ordering",
 | 
			
		||||
                "user_id",
 | 
			
		||||
                "sender",
 | 
			
		||||
                "room_id",
 | 
			
		||||
@ -1800,6 +1809,7 @@ class PersistEventsStore:
 | 
			
		||||
            values=[
 | 
			
		||||
                (
 | 
			
		||||
                    event.event_id,
 | 
			
		||||
                    event.internal_metadata.stream_ordering,
 | 
			
		||||
                    event.state_key,
 | 
			
		||||
                    event.user_id,
 | 
			
		||||
                    event.room_id,
 | 
			
		||||
@ -1832,6 +1842,7 @@ class PersistEventsStore:
 | 
			
		||||
                    keyvalues={"room_id": event.room_id, "user_id": event.state_key},
 | 
			
		||||
                    values={
 | 
			
		||||
                        "event_id": event.event_id,
 | 
			
		||||
                        "event_stream_ordering": event.internal_metadata.stream_ordering,
 | 
			
		||||
                        "membership": event.membership,
 | 
			
		||||
                    },
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, ca
 | 
			
		||||
 | 
			
		||||
import attr
 | 
			
		||||
 | 
			
		||||
from synapse.api.constants import EventContentFields, RelationTypes
 | 
			
		||||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
 | 
			
		||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 | 
			
		||||
from synapse.events import make_event_from_dict
 | 
			
		||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 | 
			
		||||
@ -71,6 +71,10 @@ class _BackgroundUpdates:
 | 
			
		||||
 | 
			
		||||
    EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
 | 
			
		||||
 | 
			
		||||
    POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = (
 | 
			
		||||
        "populate_membership_event_stream_ordering"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
 | 
			
		||||
class _CalculateChainCover:
 | 
			
		||||
@ -99,6 +103,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__(database, db_conn, hs)
 | 
			
		||||
 | 
			
		||||
        self.db_pool.updates.register_background_update_handler(
 | 
			
		||||
            _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
 | 
			
		||||
            self._populate_membership_event_stream_ordering,
 | 
			
		||||
        )
 | 
			
		||||
        self.db_pool.updates.register_background_update_handler(
 | 
			
		||||
            _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
 | 
			
		||||
            self._background_reindex_origin_server_ts,
 | 
			
		||||
@ -1498,3 +1506,97 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        return batch_size
 | 
			
		||||
 | 
			
		||||
    async def _populate_membership_event_stream_ordering(
 | 
			
		||||
        self, progress: JsonDict, batch_size: int
 | 
			
		||||
    ) -> int:
 | 
			
		||||
        def _populate_membership_event_stream_ordering(
 | 
			
		||||
            txn: LoggingTransaction,
 | 
			
		||||
        ) -> bool:
 | 
			
		||||
 | 
			
		||||
            if "max_stream_ordering" in progress:
 | 
			
		||||
                max_stream_ordering = progress["max_stream_ordering"]
 | 
			
		||||
            else:
 | 
			
		||||
                txn.execute("SELECT max(stream_ordering) FROM events")
 | 
			
		||||
                res = txn.fetchone()
 | 
			
		||||
                if res is None or res[0] is None:
 | 
			
		||||
                    return True
 | 
			
		||||
                else:
 | 
			
		||||
                    max_stream_ordering = res[0]
 | 
			
		||||
 | 
			
		||||
            start = progress.get("stream_ordering", 0)
 | 
			
		||||
            stop = start + batch_size
 | 
			
		||||
 | 
			
		||||
            sql = f"""
 | 
			
		||||
                SELECT room_id, event_id, stream_ordering
 | 
			
		||||
                FROM events
 | 
			
		||||
                WHERE
 | 
			
		||||
                    type = '{EventTypes.Member}'
 | 
			
		||||
                    AND stream_ordering >= ?
 | 
			
		||||
                    AND stream_ordering < ?
 | 
			
		||||
            """
 | 
			
		||||
            txn.execute(sql, (start, stop))
 | 
			
		||||
 | 
			
		||||
            rows: List[Tuple[str, str, int]] = cast(
 | 
			
		||||
                List[Tuple[str, str, int]], txn.fetchall()
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            event_ids: List[Tuple[str]] = []
 | 
			
		||||
            event_stream_orderings: List[Tuple[int]] = []
 | 
			
		||||
 | 
			
		||||
            for _, event_id, event_stream_ordering in rows:
 | 
			
		||||
                event_ids.append((event_id,))
 | 
			
		||||
                event_stream_orderings.append((event_stream_ordering,))
 | 
			
		||||
 | 
			
		||||
            self.db_pool.simple_update_many_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                table="current_state_events",
 | 
			
		||||
                key_names=("event_id",),
 | 
			
		||||
                key_values=event_ids,
 | 
			
		||||
                value_names=("event_stream_ordering",),
 | 
			
		||||
                value_values=event_stream_orderings,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            self.db_pool.simple_update_many_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                table="room_memberships",
 | 
			
		||||
                key_names=("event_id",),
 | 
			
		||||
                key_values=event_ids,
 | 
			
		||||
                value_names=("event_stream_ordering",),
 | 
			
		||||
                value_values=event_stream_orderings,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # NOTE: local_current_membership has no index on event_id, so only
 | 
			
		||||
            # the room ID here will reduce the query rows read.
 | 
			
		||||
            for room_id, event_id, event_stream_ordering in rows:
 | 
			
		||||
                txn.execute(
 | 
			
		||||
                    """
 | 
			
		||||
                        UPDATE local_current_membership
 | 
			
		||||
                        SET event_stream_ordering = ?
 | 
			
		||||
                        WHERE room_id = ? AND event_id = ?
 | 
			
		||||
                    """,
 | 
			
		||||
                    (event_stream_ordering, room_id, event_id),
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            self.db_pool.updates._background_update_progress_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
 | 
			
		||||
                {
 | 
			
		||||
                    "stream_ordering": stop,
 | 
			
		||||
                    "max_stream_ordering": max_stream_ordering,
 | 
			
		||||
                },
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            return stop > max_stream_ordering
 | 
			
		||||
 | 
			
		||||
        finished = await self.db_pool.runInteraction(
 | 
			
		||||
            "_populate_membership_event_stream_ordering",
 | 
			
		||||
            _populate_membership_event_stream_ordering,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if finished:
 | 
			
		||||
            await self.db_pool.updates._end_background_update(
 | 
			
		||||
                _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        return batch_size
 | 
			
		||||
 | 
			
		||||
@ -1779,7 +1779,7 @@ class EventsWorkerStore(SQLBaseStore):
 | 
			
		||||
            txn: LoggingTransaction,
 | 
			
		||||
        ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
 | 
			
		||||
            sql = (
 | 
			
		||||
                "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
 | 
			
		||||
                "SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type,"
 | 
			
		||||
                " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
 | 
			
		||||
                " e.outlier"
 | 
			
		||||
                " FROM events AS e"
 | 
			
		||||
@ -1791,10 +1791,10 @@ class EventsWorkerStore(SQLBaseStore):
 | 
			
		||||
                " LEFT JOIN event_relations USING (event_id)"
 | 
			
		||||
                " LEFT JOIN room_memberships USING (event_id)"
 | 
			
		||||
                " LEFT JOIN rejections USING (event_id)"
 | 
			
		||||
                " WHERE ? < event_stream_ordering"
 | 
			
		||||
                " AND event_stream_ordering <= ?"
 | 
			
		||||
                " WHERE ? < out.event_stream_ordering"
 | 
			
		||||
                " AND out.event_stream_ordering <= ?"
 | 
			
		||||
                " AND out.instance_name = ?"
 | 
			
		||||
                " ORDER BY event_stream_ordering ASC"
 | 
			
		||||
                " ORDER BY out.event_stream_ordering ASC"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            txn.execute(sql, (last_id, current_id, instance_name))
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,21 @@
 | 
			
		||||
/* Copyright 2022 Beeper
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
 | 
			
		||||
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
 | 
			
		||||
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
 | 
			
		||||
 | 
			
		||||
INSERT INTO background_updates (update_name, progress_json) VALUES
 | 
			
		||||
  ('populate_membership_event_stream_ordering', '{}');
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user