Make cleaning up pushers depend on the device_id instead of the token_id (#15280)

This makes it so that we rely on the `device_id` to delete pushers on logout,
instead of relying on the `access_token_id`. This ensures we're not removing
pushers on token refresh, and prepares for a world without access token IDs
(also known as the OIDC).

This actually runs the `set_device_id_for_pushers` background update, which
was forgotten in #13831.

Note that for backwards compatibility it still deletes pushers based on the
`access_token` until the background update finishes.
This commit is contained in:
Quentin Gliech 2023-03-24 16:09:39 +01:00 committed by GitHub
parent 68a6717312
commit 5b70f240cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 142 additions and 65 deletions

1
changelog.d/15280.misc Normal file
View File

@ -0,0 +1 @@
Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations.

View File

@ -68,7 +68,10 @@ from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.pusher import (
PusherBackgroundUpdatesStore,
PusherWorkerStore,
)
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
@ -226,6 +229,7 @@ class Store(
AccountDataWorkerStore,
PushRuleStore,
PusherWorkerStore,
PusherBackgroundUpdatesStore,
PresenceBackgroundUpdateStore,
ReceiptsBackgroundUpdateStore,
RelationsWorkerStore,

View File

@ -1504,8 +1504,10 @@ class AuthHandler:
)
# delete pushers associated with this access token
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
if token.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
token.user_id, (token.token_id,)
)
@ -1535,7 +1537,9 @@ class AuthHandler:
)
# delete pushers associated with the access tokens
await self.hs.get_pusherpool().remove_pushers_by_access_token(
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)

View File

@ -503,6 +503,8 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
# Delete data specific to each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:

View File

@ -1013,11 +1013,11 @@ class RegistrationHandler:
user_tuple = await self.store.get_user_by_access_token(token)
# The token better still exist.
assert user_tuple
token_id = user_tuple.token_id
device_id = user_tuple.device_id
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",

View File

@ -103,7 +103,7 @@ class PusherConfig:
id: Optional[str]
user_name: str
access_token: Optional[int]
profile_tag: str
kind: str
app_id: str
@ -119,6 +119,11 @@ class PusherConfig:
enabled: bool
device_id: Optional[str]
# XXX(quenting): The access_token is not persisted anymore for new pushers, but we
# keep it when reading from the database, so that we don't get stale pushers
# while the "set_device_id_for_pushers" background update is running.
access_token: Optional[int]
def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {

View File

@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email
@ -97,7 +97,6 @@ class PusherPool:
async def add_or_update_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
@ -128,6 +127,22 @@ class PusherPool:
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()
# Before we actually persist the pusher, we check if the user already has one
# for this app ID and pushkey. If so, we want to keep the access token and
# device ID in place, since this could be one device modifying
# (e.g. enabling/disabling) another device's pusher.
# XXX(quenting): Even though we're not persisting the access_token_id for new
# pushers anymore, we still need to copy existing access_token_ids over when
# updating a pusher, in case the "set_device_id_for_pushers" background update
# hasn't run yet.
access_token_id = None
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
device_id = existing_config.device_id
access_token_id = existing_config.access_token
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
@ -136,7 +151,6 @@ class PusherPool:
PusherConfig(
id=None,
user_name=user_id,
access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
@ -151,23 +165,12 @@ class PusherPool:
failing_since=None,
enabled=enabled,
device_id=device_id,
access_token=access_token_id,
)
)
# Before we actually persist the pusher, we check if the user already has one
# this app ID and pushkey. If so, we want to keep the access token and device ID
# in place, since this could be one device modifying (e.g. enabling/disabling)
# another device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token
device_id = existing_config.device_id
await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
kind=kind,
app_id=app_id,
app_display_name=app_display_name,
@ -180,6 +183,7 @@ class PusherPool:
profile_tag=profile_tag,
enabled=enabled,
device_id=device_id,
access_token_id=access_token_id,
)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
@ -199,7 +203,7 @@ class PusherPool:
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
async def remove_pushers_by_access_token(
async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the pushers for a given user corresponding to a set of
@ -209,6 +213,8 @@ class PusherPool:
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
# XXX(quenting): This is only needed until the "set_device_id_for_pushers"
# background update finishes
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
@ -220,6 +226,26 @@ class PusherPool:
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
async def remove_pushers_by_devices(
self, user_id: str, devices: StrCollection
) -> None:
"""Remove the pushers for a given user corresponding to a set of devices
Args:
user_id: user to remove pushers for
devices: device IDs to remove pushers for
"""
device_ids = set(devices)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.device_id in device_ids:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p.app_id,
p.pushkey,
p.user_name,
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.

View File

@ -425,7 +425,6 @@ class UserRestServletV2(RestServlet):
):
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",

View File

@ -126,7 +126,6 @@ class PushersSetRestServlet(RestServlet):
try:
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content["kind"],
app_id=content["app_id"],
app_display_name=content["app_display_name"],

View File

@ -509,19 +509,24 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update to populate the device_id column of the pushers table."""
"""
Background update to populate the device_id column and clear the access_token
column for the pushers table.
"""
last_pusher_id = progress.get("pusher_id", 0)
def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
SELECT p.id, at.device_id
SELECT
p.id AS pusher_id,
p.device_id AS pusher_device_id,
at.device_id AS token_device_id
FROM pushers AS p
INNER JOIN access_tokens AS at
LEFT JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
@ -533,13 +538,27 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
if len(rows) == 0:
return 0
# The reason we're clearing the access_token column here is a bit subtle.
# When a user logs out, we:
# (1) delete the access token
# (2) delete the device
#
# Ideally, we would delete the pushers only via its link to the device
# during (2), but since this background update might not have fully run yet,
# we're still deleting the pushers via the access token during (1).
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
key_values=[(row["id"],) for row in rows],
value_names=("device_id",),
value_values=[(row["device_id"],) for row in rows],
key_values=[(row["pusher_id"],) for row in rows],
value_names=("device_id", "access_token"),
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,
# we set the device_id we got from joining the access_tokens table.
value_values=[
(row["pusher_device_id"] or row["token_device_id"], None)
for row in rows
],
)
self.db_pool.updates._background_update_progress_txn(
@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
async def add_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
@ -581,13 +599,13 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
access_token_id: Optional[int] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
@ -599,6 +617,10 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
# XXX(quenting): We're only really persisting the access token ID
# when updating an existing pusher. This is in case the
# 'set_device_id_for_pushers' background update hasn't finished yet.
"access_token": access_token_id,
},
desc="add_pusher",
)

View File

@ -0,0 +1,19 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Triggers the background update to set the device_id for pushers
-- that don't have one, and clear the access_token column.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7402, 'set_device_id_for_pushers', '{}');

View File

@ -105,7 +105,7 @@ class EmailPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(self.access_token)
)
assert user_tuple is not None
self.token_id = user_tuple.token_id
self.device_id = user_tuple.device_id
# We need to add email to account before we can create a pusher.
self.get_success(
@ -117,7 +117,7 @@ class EmailPusherTests(HomeserverTestCase):
pusher = self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
@ -141,7 +141,7 @@ class EmailPusherTests(HomeserverTestCase):
self.get_success_or_raise(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",

View File

@ -67,13 +67,13 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
def test_data(data: Any) -> None:
self.get_failure(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -114,12 +114,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -235,12 +235,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -356,12 +356,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -443,12 +443,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -521,12 +521,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -628,12 +628,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -764,12 +764,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@ -778,7 +778,6 @@ class HTTPPusherTests(HomeserverTestCase):
lang=None,
data={"url": "http://example.com/_matrix/push/v1/notify"},
enabled=enabled,
device_id=user_tuple.device_id,
)
)
@ -895,19 +894,17 @@ class HTTPPusherTests(HomeserverTestCase):
def test_update_different_device_access_token_device_id(self) -> None:
"""Tests that if we create a pusher from one device, the update it from another
device, the access token and device ID associated with the pusher stays the
same.
device, the device ID associated with the pusher stays the same.
"""
# Create a user with a pusher.
user_id, access_token = self._make_user_with_pusher("user")
# Get the token ID for the current access token, since that's what we store in
# the pushers table. Also get the device ID from it.
# Get the device ID for the current access token, since that's what we store in
# the pushers table.
user_tuple = self.get_success(
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
# Generate a new access token, and update the pusher with it.
@ -920,10 +917,9 @@ class HTTPPusherTests(HomeserverTestCase):
)
pushers: List[PusherConfig] = list(ret)
# Check that we still have one pusher, and that the access token and device ID
# associated with it didn't change.
# Check that we still have one pusher, and that the device ID associated with
# it didn't change.
self.assertEqual(len(pushers), 1)
self.assertEqual(pushers[0].access_token, token_id)
self.assertEqual(pushers[0].device_id, device_id)
@override_config({"experimental_features": {"msc3881_enabled": True}})

View File

@ -51,12 +51,12 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_dict is not None
token_id = user_dict.token_id
device_id = user_dict.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",

View File

@ -3047,12 +3047,12 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.store.get_user_by_access_token(other_user_token)
)
assert user_tuple is not None
token_id = user_tuple.token_id
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.other_user,
access_token=token_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",