mirror of
https://github.com/matrix-org/synapse.git
synced 2025-01-21 23:56:00 +00:00
Replace device_27_unique_idx bg update with a fg one (#7562)
The bg update never managed to complete, because it kept being interrupted by transactions which want to take a lock. Just doing it in the foreground isn't that bad, and is a good deal simpler.
This commit is contained in:
parent
04729b86f8
commit
edd9a7214c
@ -75,9 +75,15 @@ for example:
|
|||||||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||||
|
|
||||||
Upgrading to v1.13.0
|
Upgrading to v1.14.0
|
||||||
====================
|
====================
|
||||||
|
|
||||||
|
This version includes a database update which is run as part of the upgrade,
|
||||||
|
and which may take a couple of minutes in the case of a large server. Synapse
|
||||||
|
will not respond to HTTP requests while this update is taking place.
|
||||||
|
|
||||||
|
Upgrading to v1.13.0
|
||||||
|
====================
|
||||||
|
|
||||||
Incorrect database migration in old synapse versions
|
Incorrect database migration in old synapse versions
|
||||||
----------------------------------------------------
|
----------------------------------------------------
|
||||||
|
1
changelog.d/7562.misc
Normal file
1
changelog.d/7562.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Improve performance of `mark_as_sent_devices_by_remote`.
|
@ -55,10 +55,6 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
|
|||||||
|
|
||||||
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
||||||
|
|
||||||
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = (
|
|
||||||
"drop_device_lists_outbound_last_success_non_unique_idx"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceWorkerStore(SQLBaseStore):
|
class DeviceWorkerStore(SQLBaseStore):
|
||||||
def get_device(self, user_id, device_id):
|
def get_device(self, user_id, device_id):
|
||||||
@ -749,19 +745,13 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
|||||||
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
|
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
|
||||||
)
|
)
|
||||||
|
|
||||||
# create a unique index on device_lists_outbound_last_success
|
# a pair of background updates that were added during the 1.14 release cycle,
|
||||||
self.db.updates.register_background_index_update(
|
# but replaced with 58/06dlols_unique_idx.py
|
||||||
|
self.db.updates.register_noop_background_update(
|
||||||
"device_lists_outbound_last_success_unique_idx",
|
"device_lists_outbound_last_success_unique_idx",
|
||||||
index_name="device_lists_outbound_last_success_unique_idx",
|
|
||||||
table="device_lists_outbound_last_success",
|
|
||||||
columns=["destination", "user_id"],
|
|
||||||
unique=True,
|
|
||||||
)
|
)
|
||||||
|
self.db.updates.register_noop_background_update(
|
||||||
# once that completes, we can remove the old non-unique index.
|
"drop_device_lists_outbound_last_success_non_unique_idx",
|
||||||
self.db.updates.register_background_update_handler(
|
|
||||||
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX,
|
|
||||||
self._drop_device_lists_outbound_last_success_non_unique_idx,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@ -838,20 +828,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
|||||||
|
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
async def _drop_device_lists_outbound_last_success_non_unique_idx(
|
|
||||||
self, progress, batch_size
|
|
||||||
):
|
|
||||||
def f(txn):
|
|
||||||
txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx")
|
|
||||||
|
|
||||||
await self.db.runInteraction(
|
|
||||||
"drop_device_lists_outbound_last_success_non_unique_idx", f,
|
|
||||||
)
|
|
||||||
await self.db.updates._end_background_update(
|
|
||||||
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX
|
|
||||||
)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||||
def __init__(self, database: Database, db_conn, hs):
|
def __init__(self, database: Database, db_conn, hs):
|
||||||
|
@ -1,28 +0,0 @@
|
|||||||
/* Copyright 2020 The Matrix.org Foundation C.I.C
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
-- register a background update which will create a unique index on
|
|
||||||
-- device_lists_outbound_last_success
|
|
||||||
INSERT into background_updates (ordering, update_name, progress_json)
|
|
||||||
VALUES (5804, 'device_lists_outbound_last_success_unique_idx', '{}');
|
|
||||||
|
|
||||||
-- once that completes, we can drop the old index.
|
|
||||||
INSERT into background_updates (ordering, update_name, progress_json, depends_on)
|
|
||||||
VALUES (
|
|
||||||
5804,
|
|
||||||
'drop_device_lists_outbound_last_success_non_unique_idx',
|
|
||||||
'{}',
|
|
||||||
'device_lists_outbound_last_success_unique_idx'
|
|
||||||
);
|
|
@ -0,0 +1,80 @@
|
|||||||
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
This migration rebuilds the device_lists_outbound_last_success table without duplicate
|
||||||
|
entries, and with a UNIQUE index.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from io import StringIO
|
||||||
|
|
||||||
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
|
||||||
|
from synapse.storage.prepare_database import execute_statements_from_stream
|
||||||
|
from synapse.storage.types import Cursor
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def run_upgrade(*args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
|
||||||
|
# some instances might already have this index, in which case we can skip this
|
||||||
|
if isinstance(database_engine, PostgresEngine):
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
SELECT 1 FROM pg_class WHERE relkind = 'i'
|
||||||
|
AND relname = 'device_lists_outbound_last_success_unique_idx'
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
if cur.rowcount:
|
||||||
|
logger.info(
|
||||||
|
"Unique index exists on device_lists_outbound_last_success: "
|
||||||
|
"skipping rebuild"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Rebuilding device_lists_outbound_last_success with unique index")
|
||||||
|
execute_statements_from_stream(cur, StringIO(_rebuild_commands))
|
||||||
|
|
||||||
|
|
||||||
|
# there might be duplicates, so the easiest way to achieve this is to create a new
|
||||||
|
# table with the right data, and renaming it into place
|
||||||
|
|
||||||
|
_rebuild_commands = """
|
||||||
|
DROP TABLE IF EXISTS device_lists_outbound_last_success_new;
|
||||||
|
|
||||||
|
CREATE TABLE device_lists_outbound_last_success_new (
|
||||||
|
destination TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
stream_id BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- this took about 30 seconds on matrix.org's 16 million rows.
|
||||||
|
INSERT INTO device_lists_outbound_last_success_new
|
||||||
|
SELECT destination, user_id, MAX(stream_id) FROM device_lists_outbound_last_success
|
||||||
|
GROUP BY destination, user_id;
|
||||||
|
|
||||||
|
-- and this another 30 seconds.
|
||||||
|
CREATE UNIQUE INDEX device_lists_outbound_last_success_unique_idx
|
||||||
|
ON device_lists_outbound_last_success_new (destination, user_id);
|
||||||
|
|
||||||
|
DROP TABLE device_lists_outbound_last_success;
|
||||||
|
|
||||||
|
ALTER TABLE device_lists_outbound_last_success_new
|
||||||
|
RENAME TO device_lists_outbound_last_success;
|
||||||
|
"""
|
@ -78,7 +78,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
|
|||||||
"device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
|
"device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
|
||||||
"device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
|
"device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
|
||||||
"event_search": "event_search_event_id_idx",
|
"event_search": "event_search_event_id_idx",
|
||||||
"device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,10 +19,12 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
from typing import TextIO
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from synapse.storage.engines.postgres import PostgresEngine
|
from synapse.storage.engines.postgres import PostgresEngine
|
||||||
|
from synapse.storage.types import Cursor
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -479,8 +481,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info("applying schema %s for %s", name, modname)
|
logger.info("applying schema %s for %s", name, modname)
|
||||||
for statement in get_statements(stream):
|
execute_statements_from_stream(cur, stream)
|
||||||
cur.execute(statement)
|
|
||||||
|
|
||||||
# Mark as done.
|
# Mark as done.
|
||||||
cur.execute(
|
cur.execute(
|
||||||
@ -538,8 +539,12 @@ def get_statements(f):
|
|||||||
|
|
||||||
def executescript(txn, schema_path):
|
def executescript(txn, schema_path):
|
||||||
with open(schema_path, "r") as f:
|
with open(schema_path, "r") as f:
|
||||||
|
execute_statements_from_stream(txn, f)
|
||||||
|
|
||||||
|
|
||||||
|
def execute_statements_from_stream(cur: Cursor, f: TextIO):
|
||||||
for statement in get_statements(f):
|
for statement in get_statements(f):
|
||||||
txn.execute(statement)
|
cur.execute(statement)
|
||||||
|
|
||||||
|
|
||||||
def _get_or_create_schema_state(txn, database_engine):
|
def _get_or_create_schema_state(txn, database_engine):
|
||||||
|
Loading…
Reference in New Issue
Block a user