Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2023-11-16 16:27:21 +00:00
commit b20bdd3997
109 changed files with 2890 additions and 1159 deletions

View File

@ -8,21 +8,21 @@
# If ignoring a pull request that was not squash merged, only the merge # If ignoring a pull request that was not squash merged, only the merge
# commit needs to be put here. Child commits will be resolved from it. # commit needs to be put here. Child commits will be resolved from it.
# Run black (#3679). # Run black (https://github.com/matrix-org/synapse/pull/3679).
8b3d9b6b199abb87246f982d5db356f1966db925 8b3d9b6b199abb87246f982d5db356f1966db925
# Black reformatting (#5482). # Black reformatting (https://github.com/matrix-org/synapse/pull/5482).
32e7c9e7f20b57dd081023ac42d6931a8da9b3a3 32e7c9e7f20b57dd081023ac42d6931a8da9b3a3
# Target Python 3.5 with black (#8664). # Target Python 3.5 with black (https://github.com/matrix-org/synapse/pull/8664).
aff1eb7c671b0a3813407321d2702ec46c71fa56 aff1eb7c671b0a3813407321d2702ec46c71fa56
# Update black to 20.8b1 (#9381). # Update black to 20.8b1 (https://github.com/matrix-org/synapse/pull/9381).
0a00b7ff14890987f09112a2ae696c61001e6cf1 0a00b7ff14890987f09112a2ae696c61001e6cf1
# Convert tests/rest/admin/test_room.py to unix file endings (#7953). # Convert tests/rest/admin/test_room.py to unix file endings (https://github.com/matrix-org/synapse/pull/7953).
c4268e3da64f1abb5b31deaeb5769adb6510c0a7 c4268e3da64f1abb5b31deaeb5769adb6510c0a7
# Update black to 23.1.0 (#15103) # Update black to 23.1.0 (https://github.com/matrix-org/synapse/pull/15103)
9bb2eac71962970d02842bca441f4bcdbbf93a11 9bb2eac71962970d02842bca441f4bcdbbf93a11

8
Cargo.lock generated
View File

@ -332,18 +332,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.190" version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.190" version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -0,0 +1 @@
Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper.

1
changelog.d/16051.misc Normal file
View File

@ -0,0 +1 @@
Remove whole table locks on push rule modifications. Contributed by Nick @ Beeper (@fizzadar).

1
changelog.d/16456.misc Normal file
View File

@ -0,0 +1 @@
Add a Postgres `REPLICA IDENTITY` to tables that do not have an implicit one. This should allow use of Postgres logical replication.

View File

@ -1 +1 @@
Improve the performance of claiming encryption keys in multi-worker deployments. Improve the performance of some operations in multi-worker deployments.

1
changelog.d/16615.misc Normal file
View File

@ -0,0 +1 @@
Use more generic database methods.

View File

@ -0,0 +1 @@
Improve the performance of some operations in multi-worker deployments.

1
changelog.d/16617.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where Synapse would not unbind third-party identifiers for Application Service users when deactivated and would not emit a compliant response.

1
changelog.d/16618.misc Normal file
View File

@ -0,0 +1 @@
Use `dbname` instead of the deprecated `database` connection parameter for psycopg2.

1
changelog.d/16628.doc Normal file
View File

@ -0,0 +1 @@
Note that the option [`outbound_federation_restricted_to`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#outbound_federation_restricted_to) was added in Synapse 1.89.0, and fix a nearby formatting error.

1
changelog.d/16631.doc Normal file
View File

@ -0,0 +1 @@
Update parameter information for the `/timestamp_to_event` admin API.

1
changelog.d/16634.misc Normal file
View File

@ -0,0 +1 @@
Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA.

1
changelog.d/16637.misc Normal file
View File

@ -0,0 +1 @@
Improve references to GitHub issues.

1
changelog.d/16638.misc Normal file
View File

@ -0,0 +1 @@
Improve references to GitHub issues.

1
changelog.d/16639.bugfix Normal file
View File

@ -0,0 +1 @@
Fix sending out of order `POSITION` over replication, causing additional database load.

1
changelog.d/16640.misc Normal file
View File

@ -0,0 +1 @@
More efficiently handle no-op `POSITION` over replication.

1
changelog.d/16643.misc Normal file
View File

@ -0,0 +1 @@
Speed up deleting of device messages when deleting a device.

1
changelog.d/16647.misc Normal file
View File

@ -0,0 +1 @@
Add a Postgres `REPLICA IDENTITY` to tables that do not have an implicit one. This should allow use of Postgres logical replication.

1
changelog.d/16649.misc Normal file
View File

@ -0,0 +1 @@
Speed up persisting large number of outliers.

2
debian/changelog vendored
View File

@ -1637,7 +1637,7 @@ matrix-synapse-py3 (0.99.3.1) stable; urgency=medium
matrix-synapse-py3 (0.99.3) stable; urgency=medium matrix-synapse-py3 (0.99.3) stable; urgency=medium
[ Richard van der Hoff ] [ Richard van der Hoff ]
* Fix warning during preconfiguration. (Fixes: #4819) * Fix warning during preconfiguration. (Fixes: https://github.com/matrix-org/synapse/issues/4819)
[ Synapse Packaging team ] [ Synapse Packaging team ]
* New synapse release 0.99.3. * New synapse release 0.99.3.

View File

@ -536,7 +536,8 @@ The following query parameters are available:
**Response** **Response**
* `event_id` - converted from timestamp * `event_id` - The event ID closest to the given timestamp.
* `origin_server_ts` - The timestamp of the event in milliseconds since the Unix epoch.
# Block Room API # Block Room API
The Block Room admin API allows server admins to block and unblock rooms, The Block Room admin API allows server admins to block and unblock rooms,

View File

@ -773,6 +773,43 @@ Note: The token will expire if the *admin* user calls `/logout/all` from any
of their devices, but the token will *not* expire if the target user does the of their devices, but the token will *not* expire if the target user does the
same. same.
## Allow replacing master cross-signing key without User-Interactive Auth
This endpoint is not intended for server administrator usage;
we describe it here for completeness.
This API temporarily permits a user to replace their master cross-signing key
without going through
[user-interactive authentication](https://spec.matrix.org/v1.8/client-server-api/#user-interactive-authentication-api) (UIA).
This is useful when Synapse has delegated its authentication to the
[Matrix Authentication Service](https://github.com/matrix-org/matrix-authentication-service/);
as Synapse cannot perform UIA is not possible in these circumstances.
The API is
```http request
POST /_synapse/admin/v1/users/<user_id>/_allow_cross_signing_replacement_without_uia
{}
```
If the user does not exist, or does exist but has no master cross-signing key,
this will return with status code `404 Not Found`.
Otherwise, a response body like the following is returned, with status `200 OK`:
```json
{
"updatable_without_uia_before_ms": 1234567890
}
```
The response body is a JSON object with a single field:
- `updatable_without_uia_before_ms`: integer. The timestamp in milliseconds
before which the user is permitted to replace their cross-signing key without
going through UIA.
_Added in Synapse 1.97.0._
## User devices ## User devices

File diff suppressed because it is too large Load Diff

View File

@ -66,7 +66,7 @@ database:
args: args:
user: <user> user: <user>
password: <pass> password: <pass>
database: <db> dbname: <db>
host: <host> host: <host>
cp_min: 5 cp_min: 5
cp_max: 10 cp_max: 10

View File

@ -1447,7 +1447,7 @@ database:
args: args:
user: synapse_user user: synapse_user
password: secretpassword password: secretpassword
database: synapse dbname: synapse
host: localhost host: localhost
port: 5432 port: 5432
cp_min: 5 cp_min: 5
@ -1526,7 +1526,7 @@ databases:
args: args:
user: synapse_user user: synapse_user
password: secretpassword password: secretpassword
database: synapse_main dbname: synapse_main
host: localhost host: localhost
port: 5432 port: 5432
cp_min: 5 cp_min: 5
@ -1539,7 +1539,7 @@ databases:
args: args:
user: synapse_user user: synapse_user
password: secretpassword password: secretpassword
database: synapse_state dbname: synapse_state
host: localhost host: localhost
port: 5432 port: 5432
cp_min: 5 cp_min: 5
@ -1753,6 +1753,19 @@ rc_third_party_invite:
burst_count: 10 burst_count: 10
``` ```
--- ---
### `rc_media_create`
This option ratelimits creation of MXC URIs via the `/_matrix/media/v1/create`
endpoint based on the account that's creating the media. Defaults to
`per_second: 10`, `burst_count: 50`.
Example configuration:
```yaml
rc_media_create:
per_second: 10
burst_count: 50
```
---
### `rc_federation` ### `rc_federation`
Defines limits on federation requests. Defines limits on federation requests.
@ -1814,6 +1827,27 @@ Example configuration:
media_store_path: "DATADIR/media_store" media_store_path: "DATADIR/media_store"
``` ```
--- ---
### `max_pending_media_uploads`
How many *pending media uploads* can a given user have? A pending media upload
is a created MXC URI that (a) is not expired (the `unused_expires_at` timestamp
has not passed) and (b) the media has not yet been uploaded for. Defaults to 5.
Example configuration:
```yaml
max_pending_media_uploads: 5
```
---
### `unused_expiration_time`
How long to wait in milliseconds before expiring created media IDs. Defaults to
"24h"
Example configuration:
```yaml
unused_expiration_time: "1h"
```
---
### `media_storage_providers` ### `media_storage_providers`
Media storage providers allow media to be stored in different Media storage providers allow media to be stored in different
@ -4219,6 +4253,9 @@ outbound_federation_restricted_to:
Also see the [worker Also see the [worker
documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers) documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers)
for more info. for more info.
_Added in Synapse 1.89.0._
--- ---
### `run_background_tasks_on` ### `run_background_tasks_on`

56
poetry.lock generated
View File

@ -416,19 +416,6 @@ files = [
[package.dependencies] [package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""} colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "click-default-group"
version = "1.2.2"
description = "Extends click.Group to invoke a command without explicit subcommand name"
optional = false
python-versions = "*"
files = [
{file = "click-default-group-1.2.2.tar.gz", hash = "sha256:d9560e8e8dfa44b3562fbc9425042a0fd6d21956fcc2db0077f63f34253ab904"},
]
[package.dependencies]
click = "*"
[[package]] [[package]]
name = "colorama" name = "colorama"
version = "0.4.6" version = "0.4.6"
@ -1742,13 +1729,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.2.2)", "pytest (>=7.2.1)", "pytes
[[package]] [[package]]
name = "prometheus-client" name = "prometheus-client"
version = "0.17.1" version = "0.18.0"
description = "Python client for the Prometheus monitoring system." description = "Python client for the Prometheus monitoring system."
optional = false optional = false
python-versions = ">=3.6" python-versions = ">=3.8"
files = [ files = [
{file = "prometheus_client-0.17.1-py3-none-any.whl", hash = "sha256:e537f37160f6807b8202a6fc4764cdd19bac5480ddd3e0d463c3002b34462101"}, {file = "prometheus_client-0.18.0-py3-none-any.whl", hash = "sha256:8de3ae2755f890826f4b6479e5571d4f74ac17a81345fe69a6778fdb92579184"},
{file = "prometheus_client-0.17.1.tar.gz", hash = "sha256:21e674f39831ae3f8acde238afd9a27a37d0d2fb5a28ea094f0ce25d2cbf2091"}, {file = "prometheus_client-0.18.0.tar.gz", hash = "sha256:35f7a8c22139e2bb7ca5a698e92d38145bc8dc74c1c0bf56f25cca886a764e17"},
] ]
[package.extras] [package.extras]
@ -2906,18 +2893,17 @@ files = [
[[package]] [[package]]
name = "towncrier" name = "towncrier"
version = "23.6.0" version = "23.11.0"
description = "Building newsfiles for your project." description = "Building newsfiles for your project."
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.8"
files = [ files = [
{file = "towncrier-23.6.0-py3-none-any.whl", hash = "sha256:da552f29192b3c2b04d630133f194c98e9f14f0558669d427708e203fea4d0a5"}, {file = "towncrier-23.11.0-py3-none-any.whl", hash = "sha256:2e519ca619426d189e3c98c99558fe8be50c9ced13ea1fc20a4a353a95d2ded7"},
{file = "towncrier-23.6.0.tar.gz", hash = "sha256:fc29bd5ab4727c8dacfbe636f7fb5dc53b99805b62da1c96b214836159ff70c1"}, {file = "towncrier-23.11.0.tar.gz", hash = "sha256:13937c247e3f8ae20ac44d895cf5f96a60ad46cfdcc1671759530d7837d9ee5d"},
] ]
[package.dependencies] [package.dependencies]
click = "*" click = "*"
click-default-group = "*"
importlib-resources = {version = ">=5", markers = "python_version < \"3.10\""} importlib-resources = {version = ">=5", markers = "python_version < \"3.10\""}
incremental = "*" incremental = "*"
jinja2 = "*" jinja2 = "*"
@ -2928,13 +2914,13 @@ dev = ["furo", "packaging", "sphinx (>=5)", "twisted"]
[[package]] [[package]]
name = "treq" name = "treq"
version = "22.2.0" version = "23.11.0"
description = "High-level Twisted HTTP Client API" description = "High-level Twisted HTTP Client API"
optional = false optional = false
python-versions = ">=3.6" python-versions = ">=3.6"
files = [ files = [
{file = "treq-22.2.0-py3-none-any.whl", hash = "sha256:27d95b07c5c14be3e7b280416139b036087617ad5595be913b1f9b3ce981b9b2"}, {file = "treq-23.11.0-py3-none-any.whl", hash = "sha256:f494c2218d61cab2cabbee37cd6606d3eea9d16cf14190323095c95d22c467e9"},
{file = "treq-22.2.0.tar.gz", hash = "sha256:df757e3f141fc782ede076a604521194ffcb40fa2645cf48e5a37060307f52ec"}, {file = "treq-23.11.0.tar.gz", hash = "sha256:0914ff929fd1632ce16797235260f8bc19d20ff7c459c1deabd65b8c68cbeac5"},
] ]
[package.dependencies] [package.dependencies]
@ -2942,11 +2928,11 @@ attrs = "*"
hyperlink = ">=21.0.0" hyperlink = ">=21.0.0"
incremental = "*" incremental = "*"
requests = ">=2.1.0" requests = ">=2.1.0"
Twisted = {version = ">=18.7.0", extras = ["tls"]} Twisted = {version = ">=22.10.0", extras = ["tls"]}
[package.extras] [package.extras]
dev = ["httpbin (==0.5.0)", "pep8", "pyflakes"] dev = ["httpbin (==0.7.0)", "pep8", "pyflakes", "werkzeug (==2.0.3)"]
docs = ["sphinx (>=1.4.8)"] docs = ["sphinx (<7.0.0)"]
[[package]] [[package]]
name = "twine" name = "twine"
@ -3047,13 +3033,13 @@ twisted = "*"
[[package]] [[package]]
name = "types-bleach" name = "types-bleach"
version = "6.1.0.0" version = "6.1.0.1"
description = "Typing stubs for bleach" description = "Typing stubs for bleach"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
files = [ files = [
{file = "types-bleach-6.1.0.0.tar.gz", hash = "sha256:3cf0e55d4618890a00af1151f878b2e2a7a96433850b74e12bede7663d774532"}, {file = "types-bleach-6.1.0.1.tar.gz", hash = "sha256:1e43c437e734a90efe4f40ebfe831057599568d3b275939ffbd6094848a18a27"},
{file = "types_bleach-6.1.0.0-py3-none-any.whl", hash = "sha256:f0bc75d0f6475036ac69afebf37c41d116dfba78dae55db80437caf0fcd35c28"}, {file = "types_bleach-6.1.0.1-py3-none-any.whl", hash = "sha256:f83f80e0709f13d809a9c79b958a1089df9b99e68059287beb196e38967e4ddf"},
] ]
[[package]] [[package]]
@ -3127,13 +3113,13 @@ files = [
[[package]] [[package]]
name = "types-pyopenssl" name = "types-pyopenssl"
version = "23.2.0.2" version = "23.3.0.0"
description = "Typing stubs for pyOpenSSL" description = "Typing stubs for pyOpenSSL"
optional = false optional = false
python-versions = "*" python-versions = ">=3.7"
files = [ files = [
{file = "types-pyOpenSSL-23.2.0.2.tar.gz", hash = "sha256:6a010dac9ecd42b582d7dd2cc3e9e40486b79b3b64bb2fffba1474ff96af906d"}, {file = "types-pyOpenSSL-23.3.0.0.tar.gz", hash = "sha256:5ffb077fe70b699c88d5caab999ae80e192fe28bf6cda7989b7e79b1e4e2dcd3"},
{file = "types_pyOpenSSL-23.2.0.2-py3-none-any.whl", hash = "sha256:19536aa3debfbe25a918cf0d898e9f5fbbe6f3594a429da7914bf331deb1b342"}, {file = "types_pyOpenSSL-23.3.0.0-py3-none-any.whl", hash = "sha256:00171433653265843b7469ddb9f3c86d698668064cc33ef10537822156130ebf"},
] ]
[package.dependencies] [package.dependencies]

View File

@ -192,7 +192,7 @@ phonenumbers = ">=8.2.0"
# we use GaugeHistogramMetric, which was added in prom-client 0.4.0. # we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
prometheus-client = ">=0.4.0" prometheus-client = ">=0.4.0"
# we use `order`, which arrived in attrs 19.2.0. # we use `order`, which arrived in attrs 19.2.0.
# Note: 21.1.0 broke `/sync`, see #9936 # Note: 21.1.0 broke `/sync`, see https://github.com/matrix-org/synapse/issues/9936
attrs = ">=19.2.0,!=21.1.0" attrs = ">=19.2.0,!=21.1.0"
netaddr = ">=0.7.18" netaddr = ">=0.7.18"
# Jinja 2.x is incompatible with MarkupSafe>=2.1. To ensure that admins do not # Jinja 2.x is incompatible with MarkupSafe>=2.1. To ensure that admins do not
@ -357,7 +357,7 @@ commonmark = ">=0.9.1"
pygithub = ">=1.55" pygithub = ">=1.55"
# The following are executed as commands by the release script. # The following are executed as commands by the release script.
twine = "*" twine = "*"
# Towncrier min version comes from #3425. Rationale unclear. # Towncrier min version comes from https://github.com/matrix-org/synapse/pull/3425. Rationale unclear.
towncrier = ">=18.6.0rc1" towncrier = ">=18.6.0rc1"
# Used for checking the Poetry lockfile # Used for checking the Poetry lockfile
@ -377,8 +377,9 @@ furo = ">=2022.12.7,<2024.0.0"
[build-system] [build-system]
# The upper bounds here are defensive, intended to prevent situations like # The upper bounds here are defensive, intended to prevent situations like
# #13849 and #14079 where we see buildtime or runtime errors caused by build # https://github.com/matrix-org/synapse/issues/13849 and
# system changes. # https://github.com/matrix-org/synapse/issues/14079 where we see buildtime or
# runtime errors caused by build system changes.
# We are happy to raise these upper bounds upon request, # We are happy to raise these upper bounds upon request,
# provided we check that it's safe to do so (i.e. that CI passes). # provided we check that it's safe to do so (i.e. that CI passes).
requires = ["poetry-core>=1.1.0,<=1.7.0", "setuptools_rust>=1.3,<=1.8.1"] requires = ["poetry-core>=1.1.0,<=1.7.0", "setuptools_rust>=1.3,<=1.8.1"]

View File

@ -83,6 +83,8 @@ class Codes(str, Enum):
USER_DEACTIVATED = "M_USER_DEACTIVATED" USER_DEACTIVATED = "M_USER_DEACTIVATED"
# USER_LOCKED = "M_USER_LOCKED" # USER_LOCKED = "M_USER_LOCKED"
USER_LOCKED = "ORG_MATRIX_MSC3939_USER_LOCKED" USER_LOCKED = "ORG_MATRIX_MSC3939_USER_LOCKED"
NOT_YET_UPLOADED = "M_NOT_YET_UPLOADED"
CANNOT_OVERWRITE_MEDIA = "M_CANNOT_OVERWRITE_MEDIA"
# Part of MSC3848 # Part of MSC3848
# https://github.com/matrix-org/matrix-spec-proposals/pull/3848 # https://github.com/matrix-org/matrix-spec-proposals/pull/3848

View File

@ -104,8 +104,8 @@ logger = logging.getLogger("synapse.app.generic_worker")
class GenericWorkerStore( class GenericWorkerStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly # FIXME(https://github.com/matrix-org/synapse/issues/3714): We need to add
# rather than going via the correct worker. # UserDirectoryStore as we write directly rather than going via the correct worker.
UserDirectoryStore, UserDirectoryStore,
StatsStore, StatsStore,
UIAuthWorkerStore, UIAuthWorkerStore,

View File

@ -204,3 +204,10 @@ class RatelimitConfig(Config):
"rc_third_party_invite", "rc_third_party_invite",
defaults={"per_second": 0.0025, "burst_count": 5}, defaults={"per_second": 0.0025, "burst_count": 5},
) )
# Ratelimit create media requests:
self.rc_media_create = RatelimitSettings.parse(
config,
"rc_media_create",
defaults={"per_second": 10, "burst_count": 50},
)

View File

@ -141,6 +141,12 @@ class ContentRepositoryConfig(Config):
"prevent_media_downloads_from", [] "prevent_media_downloads_from", []
) )
self.unused_expiration_time = self.parse_duration(
config.get("unused_expiration_time", "24h")
)
self.max_pending_media_uploads = config.get("max_pending_media_uploads", 5)
self.media_store_path = self.ensure_directory( self.media_store_path = self.ensure_directory(
config.get("media_store_path", "media_store") config.get("media_store_path", "media_store")
) )

View File

@ -581,14 +581,14 @@ class FederationSender(AbstractFederationSender):
"get_joined_hosts", str(sg) "get_joined_hosts", str(sg)
) )
if destinations is None: if destinations is None:
# Add logging to help track down #13444 # Add logging to help track down https://github.com/matrix-org/synapse/issues/13444
logger.info( logger.info(
"Unexpectedly did not have cached destinations for %s / %s", "Unexpectedly did not have cached destinations for %s / %s",
sg, sg,
event.event_id, event.event_id,
) )
else: else:
# Add logging to help track down #13444 # Add logging to help track down https://github.com/matrix-org/synapse/issues/13444
logger.info( logger.info(
"Unexpectedly did not have cached prev group for %s", "Unexpectedly did not have cached prev group for %s",
event.event_id, event.event_id,

View File

@ -396,15 +396,17 @@ class DeviceWorkerHandler:
up_to_stream_id = task.params["up_to_stream_id"] up_to_stream_id = task.params["up_to_stream_id"]
# Delete the messages in batches to avoid too much DB load. # Delete the messages in batches to avoid too much DB load.
from_stream_id = None
while True: while True:
res = await self.store.delete_messages_for_device( from_stream_id, _ = await self.store.delete_messages_for_device_between(
user_id=user_id, user_id=user_id,
device_id=device_id, device_id=device_id,
up_to_stream_id=up_to_stream_id, from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
) )
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: if from_stream_id is None:
return TaskStatus.COMPLETE, None, None return TaskStatus.COMPLETE, None, None
await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)

View File

@ -1450,19 +1450,25 @@ class E2eKeysHandler:
return desired_key_data return desired_key_data
async def is_cross_signing_set_up_for_user(self, user_id: str) -> bool: async def check_cross_signing_setup(self, user_id: str) -> Tuple[bool, bool]:
"""Checks if the user has cross-signing set up """Checks if the user has cross-signing set up
Args: Args:
user_id: The user to check user_id: The user to check
Returns: Returns: a 2-tuple of booleans
True if the user has cross-signing set up, False otherwise - whether the user has cross-signing set up, and
- whether the user's master cross-signing key may be replaced without UIA.
""" """
existing_master_key = await self.store.get_e2e_cross_signing_key( (
user_id, "master" exists,
) ts_replacable_without_uia_before,
return existing_master_key is not None ) = await self.store.get_master_cross_signing_key_updatable_before(user_id)
if ts_replacable_without_uia_before is None:
return exists, False
else:
return exists, self.clock.time_msec() < ts_replacable_without_uia_before
def _check_cross_signing_key( def _check_cross_signing_key(

View File

@ -88,7 +88,7 @@ from synapse.types import (
) )
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter, partition from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr from synapse.util.stringutils import shortstr
@ -748,7 +748,7 @@ class FederationEventHandler:
# fetching fresh state for the room if the missing event # fetching fresh state for the room if the missing event
# can't be found, which slightly reduces our security. # can't be found, which slightly reduces our security.
# it may also increase our DAG extremity count for the room, # it may also increase our DAG extremity count for the room,
# causing additional state resolution? See #1760. # causing additional state resolution? See https://github.com/matrix-org/synapse/issues/1760.
# However, fetching state doesn't hold the linearizer lock # However, fetching state doesn't hold the linearizer lock
# apparently. # apparently.
# #
@ -1669,14 +1669,13 @@ class FederationEventHandler:
# XXX: it might be possible to kick this process off in parallel with fetching # XXX: it might be possible to kick this process off in parallel with fetching
# the events. # the events.
while event_map:
# build a list of events whose auth events are not in the queue.
roots = tuple(
ev
for ev in event_map.values()
if not any(aid in event_map for aid in ev.auth_event_ids())
)
# We need to persist an event's auth events before the event.
auth_graph = {
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots: if not roots:
# if *none* of the remaining events are ready, that means # if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that # we have a loop. This either means a bug in our logic, or that
@ -1698,9 +1697,6 @@ class FederationEventHandler:
await self._auth_and_persist_outliers_inner(room_id, roots) await self._auth_and_persist_outliers_inner(room_id, roots)
for ev in roots:
del event_map[ev.event_id]
async def _auth_and_persist_outliers_inner( async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase] self, room_id: str, fetched_events: Collection[EventBase]
) -> None: ) -> None:

View File

@ -1816,7 +1816,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# the same token repeatedly. # the same token repeatedly.
# #
# Hence this guard where we just return nothing so that the sync # Hence this guard where we just return nothing so that the sync
# doesn't return. C.f. #5503. # doesn't return. C.f. https://github.com/matrix-org/synapse/issues/5503.
return [], max_token return [], max_token
# Figure out which other users this user should explicitly receive # Figure out which other users this user should explicitly receive

View File

@ -399,7 +399,7 @@ class SyncHandler:
# #
# If that happens, we mustn't cache it, so that when the client comes back # If that happens, we mustn't cache it, so that when the client comes back
# with the same cache token, we don't immediately return the same empty # with the same cache token, we don't immediately return the same empty
# result, causing a tightloop. (#8518) # result, causing a tightloop. (https://github.com/matrix-org/synapse/issues/8518)
if result.next_batch == since_token: if result.next_batch == since_token:
cache_context.should_cache = False cache_context.should_cache = False
@ -1003,7 +1003,7 @@ class SyncHandler:
# always make sure we LL ourselves so we know we're in the room # always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled # We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840. # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
# We don't insert ourselves into `members_to_fetch`, because in some # We don't insert ourselves into `members_to_fetch`, because in some
# rare cases (an empty event batch with a now_token after the user's # rare cases (an empty event batch with a now_token after the user's
# leave in a partial state room which another local user has # leave in a partial state room which another local user has

View File

@ -184,8 +184,8 @@ class UserDirectoryHandler(StateDeltasHandler):
"""Called to update index of our local user profiles when they change """Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in. irrespective of any rooms the user may be in.
""" """
# FIXME(#3714): We should probably do this in the same worker as all # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should
# the other changes. # probably do this in the same worker as all the other changes.
if await self.store.should_include_local_user_in_dir(user_id): if await self.store.should_include_local_user_in_dir(user_id):
await self.store.update_profile_in_user_dir( await self.store.update_profile_in_user_dir(
@ -194,8 +194,8 @@ class UserDirectoryHandler(StateDeltasHandler):
async def handle_local_user_deactivated(self, user_id: str) -> None: async def handle_local_user_deactivated(self, user_id: str) -> None:
"""Called when a user ID is deactivated""" """Called when a user ID is deactivated"""
# FIXME(#3714): We should probably do this in the same worker as all # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should
# the other changes. # probably do this in the same worker as all the other changes.
await self.store.remove_from_user_dir(user_id) await self.store.remove_from_user_dir(user_id)
async def _unsafe_process(self) -> None: async def _unsafe_process(self) -> None:

View File

@ -465,7 +465,7 @@ class MatrixFederationHttpClient:
"""Wrapper for _send_request which can optionally retry the request """Wrapper for _send_request which can optionally retry the request
upon receiving a combination of a 400 HTTP response code and a upon receiving a combination of a 400 HTTP response code and a
'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
due to #3622. due to https://github.com/matrix-org/synapse/issues/3622.
Args: Args:
request: details of request to be sent request: details of request to be sent
@ -958,9 +958,9 @@ class MatrixFederationHttpClient:
requests). requests).
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end response we should try appending a trailing slash to the end
of the request. Workaround for #3622 in Synapse <= v0.99.3. This of the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
will be attempted before backing off if backing off has been in Synapse <= v0.99.3. This will be attempted before backing off if
enabled. backing off has been enabled.
parser: The parser to use to decode the response. Defaults to parser: The parser to use to decode the response. Defaults to
parsing as JSON. parsing as JSON.
backoff_on_all_error_codes: Back off if we get any error response backoff_on_all_error_codes: Back off if we get any error response
@ -1155,7 +1155,8 @@ class MatrixFederationHttpClient:
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end of response we should try appending a trailing slash to the end of
the request. Workaround for #3622 in Synapse <= v0.99.3. the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
in Synapse <= v0.99.3.
parser: The parser to use to decode the response. Defaults to parser: The parser to use to decode the response. Defaults to
parsing as JSON. parsing as JSON.
@ -1250,7 +1251,8 @@ class MatrixFederationHttpClient:
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end of response we should try appending a trailing slash to the end of
the request. Workaround for #3622 in Synapse <= v0.99.3. the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
in Synapse <= v0.99.3.
parser: The parser to use to decode the response. Defaults to parser: The parser to use to decode the response. Defaults to
parsing as JSON. parsing as JSON.

View File

@ -83,6 +83,12 @@ INLINE_CONTENT_TYPES = [
"audio/x-flac", "audio/x-flac",
] ]
# Default timeout_ms for download and thumbnail requests
DEFAULT_MAX_TIMEOUT_MS = 20_000
# Maximum allowed timeout_ms for download and thumbnail requests
MAXIMUM_ALLOWED_MAX_TIMEOUT_MS = 60_000
def respond_404(request: SynapseRequest) -> None: def respond_404(request: SynapseRequest) -> None:
assert request.path is not None assert request.path is not None

View File

@ -27,13 +27,16 @@ import twisted.web.http
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from synapse.api.errors import ( from synapse.api.errors import (
Codes,
FederationDeniedError, FederationDeniedError,
HttpResponseException, HttpResponseException,
NotFoundError, NotFoundError,
RequestSendFailed, RequestSendFailed,
SynapseError, SynapseError,
cs_error,
) )
from synapse.config.repository import ThumbnailRequirement from synapse.config.repository import ThumbnailRequirement
from synapse.http.server import respond_with_json
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread from synapse.logging.context import defer_to_thread
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
@ -51,7 +54,7 @@ from synapse.media.storage_provider import StorageProviderWrapper
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
from synapse.media.url_previewer import UrlPreviewer from synapse.media.url_previewer import UrlPreviewer
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.media_repository import RemoteMedia from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia
from synapse.types import UserID from synapse.types import UserID
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
@ -80,6 +83,8 @@ class MediaRepository:
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.max_upload_size = hs.config.media.max_upload_size self.max_upload_size = hs.config.media.max_upload_size
self.max_image_pixels = hs.config.media.max_image_pixels self.max_image_pixels = hs.config.media.max_image_pixels
self.unused_expiration_time = hs.config.media.unused_expiration_time
self.max_pending_media_uploads = hs.config.media.max_pending_media_uploads
Thumbnailer.set_limits(self.max_image_pixels) Thumbnailer.set_limits(self.max_image_pixels)
@ -185,6 +190,117 @@ class MediaRepository:
else: else:
self.recently_accessed_locals.add(media_id) self.recently_accessed_locals.add(media_id)
@trace
async def create_media_id(self, auth_user: UserID) -> Tuple[str, int]:
"""Create and store a media ID for a local user and return the MXC URI and its
expiration.
Args:
auth_user: The user_id of the uploader
Returns:
A tuple containing the MXC URI of the stored content and the timestamp at
which the MXC URI expires.
"""
media_id = random_string(24)
now = self.clock.time_msec()
await self.store.store_local_media_id(
media_id=media_id,
time_now_ms=now,
user_id=auth_user,
)
return f"mxc://{self.server_name}/{media_id}", now + self.unused_expiration_time
@trace
async def reached_pending_media_limit(self, auth_user: UserID) -> Tuple[bool, int]:
"""Check if the user is over the limit for pending media uploads.
Args:
auth_user: The user_id of the uploader
Returns:
A tuple with a boolean and an integer indicating whether the user has too
many pending media uploads and the timestamp at which the first pending
media will expire, respectively.
"""
pending, first_expiration_ts = await self.store.count_pending_media(
user_id=auth_user
)
return pending >= self.max_pending_media_uploads, first_expiration_ts
@trace
async def verify_can_upload(self, media_id: str, auth_user: UserID) -> None:
"""Verify that the media ID can be uploaded to by the given user. This
function checks that:
* the media ID exists
* the media ID does not already have content
* the user uploading is the same as the one who created the media ID
* the media ID has not expired
Args:
media_id: The media ID to verify
auth_user: The user_id of the uploader
"""
media = await self.store.get_local_media(media_id)
if media is None:
raise SynapseError(404, "Unknow media ID", errcode=Codes.NOT_FOUND)
if media.user_id != auth_user.to_string():
raise SynapseError(
403,
"Only the creator of the media ID can upload to it",
errcode=Codes.FORBIDDEN,
)
if media.media_length is not None:
raise SynapseError(
409,
"Media ID already has content",
errcode=Codes.CANNOT_OVERWRITE_MEDIA,
)
expired_time_ms = self.clock.time_msec() - self.unused_expiration_time
if media.created_ts < expired_time_ms:
raise NotFoundError("Media ID has expired")
@trace
async def update_content(
self,
media_id: str,
media_type: str,
upload_name: Optional[str],
content: IO,
content_length: int,
auth_user: UserID,
) -> None:
"""Update the content of the given media ID.
Args:
media_id: The media ID to replace.
media_type: The content type of the file.
upload_name: The name of the file, if provided.
content: A file like object that is the content to store
content_length: The length of the content
auth_user: The user_id of the uploader
"""
file_info = FileInfo(server_name=None, file_id=media_id)
fname = await self.media_storage.store_file(content, file_info)
logger.info("Stored local media in file %r", fname)
await self.store.update_local_media(
media_id=media_id,
media_type=media_type,
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
)
try:
await self._generate_thumbnails(None, media_id, media_id, media_type)
except Exception as e:
logger.info("Failed to generate thumbnails: %s", e)
@trace @trace
async def create_content( async def create_content(
self, self,
@ -231,8 +347,74 @@ class MediaRepository:
return MXCUri(self.server_name, media_id) return MXCUri(self.server_name, media_id)
def respond_not_yet_uploaded(self, request: SynapseRequest) -> None:
respond_with_json(
request,
504,
cs_error("Media has not been uploaded yet", code=Codes.NOT_YET_UPLOADED),
send_cors=True,
)
async def get_local_media_info(
self, request: SynapseRequest, media_id: str, max_timeout_ms: int
) -> Optional[LocalMedia]:
"""Gets the info dictionary for given local media ID. If the media has
not been uploaded yet, this function will wait up to ``max_timeout_ms``
milliseconds for the media to be uploaded.
Args:
request: The incoming request.
media_id: The media ID of the content. (This is the same as
the file_id for local content.)
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Either the info dictionary for the given local media ID or
``None``. If ``None``, then no further processing is necessary as
this function will send the necessary JSON response.
"""
wait_until = self.clock.time_msec() + max_timeout_ms
while True:
# Get the info for the media
media_info = await self.store.get_local_media(media_id)
if not media_info:
logger.info("Media %s is unknown", media_id)
respond_404(request)
return None
if media_info.quarantined_by:
logger.info("Media %s is quarantined", media_id)
respond_404(request)
return None
# The file has been uploaded, so stop looping
if media_info.media_length is not None:
return media_info
# Check if the media ID has expired and still hasn't been uploaded to.
now = self.clock.time_msec()
expired_time_ms = now - self.unused_expiration_time
if media_info.created_ts < expired_time_ms:
logger.info("Media %s has expired without being uploaded", media_id)
respond_404(request)
return None
if now >= wait_until:
break
await self.clock.sleep(0.5)
logger.info("Media %s has not yet been uploaded", media_id)
self.respond_not_yet_uploaded(request)
return None
async def get_local_media( async def get_local_media(
self, request: SynapseRequest, media_id: str, name: Optional[str] self,
request: SynapseRequest,
media_id: str,
name: Optional[str],
max_timeout_ms: int,
) -> None: ) -> None:
"""Responds to requests for local media, if exists, or returns 404. """Responds to requests for local media, if exists, or returns 404.
@ -242,13 +424,14 @@ class MediaRepository:
the file_id for local content.) the file_id for local content.)
name: Optional name that, if specified, will be used as name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response. the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns: Returns:
Resolves once a response has successfully been written to request Resolves once a response has successfully been written to request
""" """
media_info = await self.store.get_local_media(media_id) media_info = await self.get_local_media_info(request, media_id, max_timeout_ms)
if not media_info or media_info.quarantined_by: if not media_info:
respond_404(request)
return return
self.mark_recently_accessed(None, media_id) self.mark_recently_accessed(None, media_id)
@ -273,6 +456,7 @@ class MediaRepository:
server_name: str, server_name: str,
media_id: str, media_id: str,
name: Optional[str], name: Optional[str],
max_timeout_ms: int,
) -> None: ) -> None:
"""Respond to requests for remote media. """Respond to requests for remote media.
@ -282,6 +466,8 @@ class MediaRepository:
media_id: The media ID of the content (as defined by the remote server). media_id: The media ID of the content (as defined by the remote server).
name: Optional name that, if specified, will be used as name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response. the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns: Returns:
Resolves once a response has successfully been written to request Resolves once a response has successfully been written to request
@ -307,11 +493,11 @@ class MediaRepository:
key = (server_name, media_id) key = (server_name, media_id)
async with self.remote_media_linearizer.queue(key): async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl( responder, media_info = await self._get_remote_media_impl(
server_name, media_id server_name, media_id, max_timeout_ms
) )
# We deliberately stream the file outside the lock # We deliberately stream the file outside the lock
if responder: if responder and media_info:
upload_name = name if name else media_info.upload_name upload_name = name if name else media_info.upload_name
await respond_with_responder( await respond_with_responder(
request, request,
@ -324,7 +510,7 @@ class MediaRepository:
respond_404(request) respond_404(request)
async def get_remote_media_info( async def get_remote_media_info(
self, server_name: str, media_id: str self, server_name: str, media_id: str, max_timeout_ms: int
) -> RemoteMedia: ) -> RemoteMedia:
"""Gets the media info associated with the remote file, downloading """Gets the media info associated with the remote file, downloading
if necessary. if necessary.
@ -332,6 +518,8 @@ class MediaRepository:
Args: Args:
server_name: Remote server_name where the media originated. server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the remote server). media_id: The media ID of the content (as defined by the remote server).
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns: Returns:
The media info of the file The media info of the file
@ -347,7 +535,7 @@ class MediaRepository:
key = (server_name, media_id) key = (server_name, media_id)
async with self.remote_media_linearizer.queue(key): async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl( responder, media_info = await self._get_remote_media_impl(
server_name, media_id server_name, media_id, max_timeout_ms
) )
# Ensure we actually use the responder so that it releases resources # Ensure we actually use the responder so that it releases resources
@ -358,7 +546,7 @@ class MediaRepository:
return media_info return media_info
async def _get_remote_media_impl( async def _get_remote_media_impl(
self, server_name: str, media_id: str self, server_name: str, media_id: str, max_timeout_ms: int
) -> Tuple[Optional[Responder], RemoteMedia]: ) -> Tuple[Optional[Responder], RemoteMedia]:
"""Looks for media in local cache, if not there then attempt to """Looks for media in local cache, if not there then attempt to
download from remote server. download from remote server.
@ -367,6 +555,8 @@ class MediaRepository:
server_name: Remote server_name where the media originated. server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the media_id: The media ID of the content (as defined by the
remote server). remote server).
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns: Returns:
A tuple of responder and the media info of the file. A tuple of responder and the media info of the file.
@ -399,8 +589,7 @@ class MediaRepository:
try: try:
media_info = await self._download_remote_file( media_info = await self._download_remote_file(
server_name, server_name, media_id, max_timeout_ms
media_id,
) )
except SynapseError: except SynapseError:
raise raise
@ -433,6 +622,7 @@ class MediaRepository:
self, self,
server_name: str, server_name: str,
media_id: str, media_id: str,
max_timeout_ms: int,
) -> RemoteMedia: ) -> RemoteMedia:
"""Attempt to download the remote file from the given server name, """Attempt to download the remote file from the given server name,
using the given file_id as the local id. using the given file_id as the local id.
@ -442,7 +632,8 @@ class MediaRepository:
media_id: The media ID of the content (as defined by the media_id: The media ID of the content (as defined by the
remote server). This is different than the file_id, which is remote server). This is different than the file_id, which is
locally generated. locally generated.
file_id: Local file ID max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns: Returns:
The media info of the file. The media info of the file.
@ -466,7 +657,8 @@ class MediaRepository:
# tell the remote server to 404 if it doesn't # tell the remote server to 404 if it doesn't
# recognise the server_name, to make sure we don't # recognise the server_name, to make sure we don't
# end up with a routing loop. # end up with a routing loop.
"allow_remote": "false" "allow_remote": "false",
"timeout_ms": str(max_timeout_ms),
}, },
) )
except RequestSendFailed as e: except RequestSendFailed as e:

View File

@ -295,7 +295,8 @@ class ThirdPartyEventRulesModuleApiCallbacks:
raise raise
except SynapseError as e: except SynapseError as e:
# FIXME: Being able to throw SynapseErrors is relied upon by # FIXME: Being able to throw SynapseErrors is relied upon by
# some modules. PR #10386 accidentally broke this ability. # some modules. PR https://github.com/matrix-org/synapse/pull/10386
# accidentally broke this ability.
# That said, we aren't keen on exposing this implementation detail # That said, we aren't keen on exposing this implementation detail
# to modules and we should one day have a proper way to do what # to modules and we should one day have a proper way to do what
# is wanted. # is wanted.

View File

@ -257,6 +257,11 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled: if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released) self._notifier.add_lock_released_callback(self.on_lock_released)
# Marks if we should send POSITION commands for all streams ASAP. This
# is checked by the `ReplicationStreamer` which manages sending
# RDATA/POSITION commands
self._should_announce_positions = True
def subscribe_to_channel(self, channel_name: str) -> None: def subscribe_to_channel(self, channel_name: str) -> None:
""" """
Indicates that we wish to subscribe to a Redis channel by name. Indicates that we wish to subscribe to a Redis channel by name.
@ -397,29 +402,23 @@ class ReplicationCommandHandler:
return self._streams_to_replicate return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None: def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
self.send_positions_to_connection(conn) self.send_positions_to_connection()
def send_positions_to_connection(self, conn: IReplicationConnection) -> None: def send_positions_to_connection(self) -> None:
"""Send current position of all streams this process is source of to """Send current position of all streams this process is source of to
the connection. the connection.
""" """
# We respond with current position of all streams this instance self._should_announce_positions = True
# replicates. self._notifier.notify_replication()
for stream in self.get_streams_to_replicate():
# Note that we use the current token as the prev token here (rather def should_announce_positions(self) -> bool:
# than stream.last_token), as we can't be sure that there have been """Check if we should send POSITION commands for all streams ASAP."""
# no rows written between last token and the current token (since we return self._should_announce_positions
# might be racing with the replication sending bg process).
current_token = stream.current_token(self._instance_name) def will_announce_positions(self) -> None:
self.send_command( """Mark that we're about to send POSITIONs out for all streams."""
PositionCommand( self._should_announce_positions = False
stream.NAME,
self._instance_name,
current_token,
current_token,
)
)
def on_USER_SYNC( def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand self, conn: IReplicationConnection, cmd: UserSyncCommand
@ -588,6 +587,21 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line()) logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
# Check if we can early discard this position. We can only do so for
# connected streams.
stream = self._streams[cmd.stream_name]
if stream.can_discard_position(
cmd.instance_name, cmd.prev_token, cmd.new_token
) and self.is_stream_connected(conn, cmd.stream_name):
logger.debug(
"Discarding redundant POSITION %s/%s %s %s",
cmd.instance_name,
cmd.stream_name,
cmd.prev_token,
cmd.new_token,
)
return
self._add_command_to_stream_queue(conn, cmd) self._add_command_to_stream_queue(conn, cmd)
async def _process_position( async def _process_position(
@ -599,6 +613,18 @@ class ReplicationCommandHandler:
""" """
stream = self._streams[stream_name] stream = self._streams[stream_name]
if stream.can_discard_position(
cmd.instance_name, cmd.prev_token, cmd.new_token
) and self.is_stream_connected(conn, cmd.stream_name):
logger.debug(
"Discarding redundant POSITION %s/%s %s %s",
cmd.instance_name,
cmd.stream_name,
cmd.prev_token,
cmd.new_token,
)
return
# We're about to go and catch up with the stream, so remove from set # We're about to go and catch up with the stream, so remove from set
# of connected streams. # of connected streams.
for streams in self._streams_by_connection.values(): for streams in self._streams_by_connection.values():
@ -626,8 +652,9 @@ class ReplicationCommandHandler:
# for why this can happen. # for why this can happen.
logger.info( logger.info(
"Fetching replication rows for '%s' between %i and %i", "Fetching replication rows for '%s' / %s between %i and %i",
stream_name, stream_name,
cmd.instance_name,
current_token, current_token,
cmd.new_token, cmd.new_token,
) )
@ -657,6 +684,13 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name) self._streams_by_connection.setdefault(conn, set()).add(stream_name)
def is_stream_connected(
self, conn: IReplicationConnection, stream_name: str
) -> bool:
"""Return if stream has been successfully connected and is ready to
receive updates"""
return stream_name in self._streams_by_connection.get(conn, ())
def on_REMOTE_SERVER_UP( def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None: ) -> None:

View File

@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
# We send out our positions when there is a new connection in case the # We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the # other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE. # otherside won't know we've connected and so won't issue a REPLICATE.
self.synapse_handler.send_positions_to_connection(self) self.synapse_handler.send_positions_to_connection()
def messageReceived(self, pattern: str, channel: str, message: str) -> None: def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis.""" """Received a message from redis."""

View File

@ -123,7 +123,7 @@ class ReplicationStreamer:
# We check up front to see if anything has actually changed, as we get # We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances. # poked because of changes that happened on other instances.
if all( if not self.command_handler.should_announce_positions() and all(
stream.last_token == stream.current_token(self._instance_name) stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams for stream in self.streams
): ):
@ -158,6 +158,21 @@ class ReplicationStreamer:
all_streams = list(all_streams) all_streams = list(all_streams)
random.shuffle(all_streams) random.shuffle(all_streams)
if self.command_handler.should_announce_positions():
# We need to send out POSITIONs for all streams, usually
# because a worker has reconnected.
self.command_handler.will_announce_positions()
for stream in all_streams:
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.last_token,
stream.last_token,
)
)
for stream in all_streams: for stream in all_streams:
if stream.last_token == stream.current_token( if stream.last_token == stream.current_token(
self._instance_name self._instance_name

View File

@ -144,6 +144,16 @@ class Stream:
""" """
raise NotImplementedError() raise NotImplementedError()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
"""Whether or not a position command for this stream can be discarded.
Useful for streams that can never go backwards and where we already know
the stream ID for the instance has advanced.
"""
return False
def discard_updates_and_advance(self) -> None: def discard_updates_and_advance(self) -> None:
"""Called when the stream should advance but the updates would be discarded, """Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers. e.g. when there are no currently connected workers.
@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream):
def minimal_local_current_token(self) -> Token: def minimal_local_current_token(self) -> Token:
return self._stream_id_gen.get_minimal_local_current_token() return self._stream_id_gen.get_minimal_local_current_token()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# These streams can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
def current_token_without_instance( def current_token_without_instance(
current_token: Callable[[], int] current_token: Callable[[], int]

View File

@ -88,6 +88,7 @@ from synapse.rest.admin.users import (
UserByThreePid, UserByThreePid,
UserMembershipRestServlet, UserMembershipRestServlet,
UserRegisterServlet, UserRegisterServlet,
UserReplaceMasterCrossSigningKeyRestServlet,
UserRestServletV2, UserRestServletV2,
UsersRestServletV2, UsersRestServletV2,
UserTokenRestServlet, UserTokenRestServlet,
@ -292,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListDestinationsRestServlet(hs).register(http_server) ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server) RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server) RoomTimestampToEventRestServlet(hs).register(http_server)
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
UserByExternalId(hs).register(http_server) UserByExternalId(hs).register(http_server)
UserByThreePid(hs).register(http_server) UserByThreePid(hs).register(http_server)

View File

@ -1270,6 +1270,46 @@ class AccountDataRestServlet(RestServlet):
} }
class UserReplaceMasterCrossSigningKeyRestServlet(RestServlet):
"""Allow a given user to replace their master cross-signing key without UIA.
This replacement is permitted for a limited period (currently 10 minutes).
While this is exposed via the admin API, this is intended for use by the
Matrix Authentication Service rather than server admins.
"""
PATTERNS = admin_patterns(
"/users/(?P<user_id>[^/]*)/_allow_cross_signing_replacement_without_uia"
)
REPLACEMENT_PERIOD_MS = 10 * 60 * 1000 # 10 minutes
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
async def on_POST(
self,
request: SynapseRequest,
user_id: str,
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
if user_id is None:
raise NotFoundError("User not found")
timestamp = (
await self._store.allow_master_cross_signing_key_replacement_without_uia(
user_id, self.REPLACEMENT_PERIOD_MS
)
)
if timestamp is None:
raise NotFoundError("User has no master cross-signing key")
return HTTPStatus.OK, {"updatable_without_uia_before_ms": timestamp}
class UserByExternalId(RestServlet): class UserByExternalId(RestServlet):
"""Find a user based on an external ID from an auth provider""" """Find a user based on an external ID from an auth provider"""

View File

@ -299,19 +299,16 @@ class DeactivateAccountRestServlet(RestServlet):
requester = await self.auth.get_user_by_req(request) requester = await self.auth.get_user_by_req(request)
# allow ASes to deactivate their own users # allow ASes to deactivate their own users:
if requester.app_service: # ASes don't need user-interactive auth
await self._deactivate_account_handler.deactivate_account( if not requester.app_service:
requester.user.to_string(), body.erase, requester
)
return 200, {}
await self.auth_handler.validate_user_via_ui_auth( await self.auth_handler.validate_user_via_ui_auth(
requester, requester,
request, request,
body.dict(exclude_unset=True), body.dict(exclude_unset=True),
"deactivate your account", "deactivate your account",
) )
result = await self._deactivate_account_handler.deactivate_account( result = await self._deactivate_account_handler.deactivate_account(
requester.user.to_string(), body.erase, requester, id_server=body.id_server requester.user.to_string(), body.erase, requester, id_server=body.id_server
) )

View File

@ -376,9 +376,10 @@ class SigningKeyUploadServlet(RestServlet):
user_id = requester.user.to_string() user_id = requester.user.to_string()
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
is_cross_signing_setup = ( (
await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id) is_cross_signing_setup,
) master_key_updatable_without_uia,
) = await self.e2e_keys_handler.check_cross_signing_setup(user_id)
# Before MSC3967 we required UIA both when setting up cross signing for the # Before MSC3967 we required UIA both when setting up cross signing for the
# first time and when resetting the device signing key. With MSC3967 we only # first time and when resetting the device signing key. With MSC3967 we only
@ -386,9 +387,14 @@ class SigningKeyUploadServlet(RestServlet):
# time. Because there is no UIA in MSC3861, for now we throw an error if the # time. Because there is no UIA in MSC3861, for now we throw an error if the
# user tries to reset the device signing key when MSC3861 is enabled, but allow # user tries to reset the device signing key when MSC3861 is enabled, but allow
# first-time setup. # first-time setup.
#
# XXX: We now have a get-out clause by which MAS can temporarily mark the master
# key as replaceable. It should do its own equivalent of user interactive auth
# before doing so.
if self.hs.config.experimental.msc3861.enabled: if self.hs.config.experimental.msc3861.enabled:
# There is no way to reset the device signing key with MSC3861 # The auth service has to explicitly mark the master key as replaceable
if is_cross_signing_setup: # without UIA to reset the device signing key with MSC3861.
if is_cross_signing_setup and not master_key_updatable_without_uia:
raise SynapseError( raise SynapseError(
HTTPStatus.NOT_IMPLEMENTED, HTTPStatus.NOT_IMPLEMENTED,
"Resetting cross signing keys is not yet supported with MSC3861", "Resetting cross signing keys is not yet supported with MSC3861",

View File

@ -0,0 +1,83 @@
# Copyright 2023 Beeper Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING
from synapse.api.errors import LimitExceededError
from synapse.api.ratelimiting import Ratelimiter
from synapse.http.server import respond_with_json
from synapse.http.servlet import RestServlet
from synapse.http.site import SynapseRequest
if TYPE_CHECKING:
from synapse.media.media_repository import MediaRepository
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class CreateResource(RestServlet):
PATTERNS = [re.compile("/_matrix/media/v1/create")]
def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"):
super().__init__()
self.media_repo = media_repo
self.clock = hs.get_clock()
self.auth = hs.get_auth()
self.max_pending_media_uploads = hs.config.media.max_pending_media_uploads
# A rate limiter for creating new media IDs.
self._create_media_rate_limiter = Ratelimiter(
store=hs.get_datastores().main,
clock=self.clock,
cfg=hs.config.ratelimiting.rc_media_create,
)
async def on_POST(self, request: SynapseRequest) -> None:
requester = await self.auth.get_user_by_req(request)
# If the create media requests for the user are over the limit, drop them.
await self._create_media_rate_limiter.ratelimit(requester)
(
reached_pending_limit,
first_expiration_ts,
) = await self.media_repo.reached_pending_media_limit(requester.user)
if reached_pending_limit:
raise LimitExceededError(
limiter_name="max_pending_media_uploads",
retry_after_ms=first_expiration_ts - self.clock.time_msec(),
)
content_uri, unused_expires_at = await self.media_repo.create_media_id(
requester.user
)
logger.info(
"Created Media URI %r that if unused will expire at %d",
content_uri,
unused_expires_at,
)
respond_with_json(
request,
200,
{
"content_uri": content_uri,
"unused_expires_at": unused_expires_at,
},
send_cors=True,
)

View File

@ -17,9 +17,13 @@ import re
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
from synapse.http.server import set_corp_headers, set_cors_headers from synapse.http.server import set_corp_headers, set_cors_headers
from synapse.http.servlet import RestServlet, parse_boolean from synapse.http.servlet import RestServlet, parse_boolean, parse_integer
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.media._base import respond_404 from synapse.media._base import (
DEFAULT_MAX_TIMEOUT_MS,
MAXIMUM_ALLOWED_MAX_TIMEOUT_MS,
respond_404,
)
from synapse.util.stringutils import parse_and_validate_server_name from synapse.util.stringutils import parse_and_validate_server_name
if TYPE_CHECKING: if TYPE_CHECKING:
@ -65,12 +69,16 @@ class DownloadResource(RestServlet):
) )
# Limited non-standard form of CSP for IE11 # Limited non-standard form of CSP for IE11
request.setHeader(b"X-Content-Security-Policy", b"sandbox;") request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
request.setHeader( request.setHeader(b"Referrer-Policy", b"no-referrer")
b"Referrer-Policy", max_timeout_ms = parse_integer(
b"no-referrer", request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
) )
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
if self._is_mine_server_name(server_name): if self._is_mine_server_name(server_name):
await self.media_repo.get_local_media(request, media_id, file_name) await self.media_repo.get_local_media(
request, media_id, file_name, max_timeout_ms
)
else: else:
allow_remote = parse_boolean(request, "allow_remote", default=True) allow_remote = parse_boolean(request, "allow_remote", default=True)
if not allow_remote: if not allow_remote:
@ -83,5 +91,5 @@ class DownloadResource(RestServlet):
return return
await self.media_repo.get_remote_media( await self.media_repo.get_remote_media(
request, server_name, media_id, file_name request, server_name, media_id, file_name, max_timeout_ms
) )

View File

@ -18,10 +18,11 @@ from synapse.config._base import ConfigError
from synapse.http.server import HttpServer, JsonResource from synapse.http.server import HttpServer, JsonResource
from .config_resource import MediaConfigResource from .config_resource import MediaConfigResource
from .create_resource import CreateResource
from .download_resource import DownloadResource from .download_resource import DownloadResource
from .preview_url_resource import PreviewUrlResource from .preview_url_resource import PreviewUrlResource
from .thumbnail_resource import ThumbnailResource from .thumbnail_resource import ThumbnailResource
from .upload_resource import UploadResource from .upload_resource import AsyncUploadServlet, UploadServlet
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@ -91,8 +92,9 @@ class MediaRepositoryResource(JsonResource):
# Note that many of these should not exist as v1 endpoints, but empirically # Note that many of these should not exist as v1 endpoints, but empirically
# a lot of traffic still goes to them. # a lot of traffic still goes to them.
CreateResource(hs, media_repo).register(http_server)
UploadResource(hs, media_repo).register(http_server) UploadServlet(hs, media_repo).register(http_server)
AsyncUploadServlet(hs, media_repo).register(http_server)
DownloadResource(hs, media_repo).register(http_server) DownloadResource(hs, media_repo).register(http_server)
ThumbnailResource(hs, media_repo, media_repo.media_storage).register( ThumbnailResource(hs, media_repo, media_repo.media_storage).register(
http_server http_server

View File

@ -23,6 +23,8 @@ from synapse.http.server import respond_with_json, set_corp_headers, set_cors_he
from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.media._base import ( from synapse.media._base import (
DEFAULT_MAX_TIMEOUT_MS,
MAXIMUM_ALLOWED_MAX_TIMEOUT_MS,
FileInfo, FileInfo,
ThumbnailInfo, ThumbnailInfo,
respond_404, respond_404,
@ -75,15 +77,19 @@ class ThumbnailResource(RestServlet):
method = parse_string(request, "method", "scale") method = parse_string(request, "method", "scale")
# TODO Parse the Accept header to get an prioritised list of thumbnail types. # TODO Parse the Accept header to get an prioritised list of thumbnail types.
m_type = "image/png" m_type = "image/png"
max_timeout_ms = parse_integer(
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
)
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
if self._is_mine_server_name(server_name): if self._is_mine_server_name(server_name):
if self.dynamic_thumbnails: if self.dynamic_thumbnails:
await self._select_or_generate_local_thumbnail( await self._select_or_generate_local_thumbnail(
request, media_id, width, height, method, m_type request, media_id, width, height, method, m_type, max_timeout_ms
) )
else: else:
await self._respond_local_thumbnail( await self._respond_local_thumbnail(
request, media_id, width, height, method, m_type request, media_id, width, height, method, m_type, max_timeout_ms
) )
self.media_repo.mark_recently_accessed(None, media_id) self.media_repo.mark_recently_accessed(None, media_id)
else: else:
@ -95,13 +101,20 @@ class ThumbnailResource(RestServlet):
respond_404(request) respond_404(request)
return return
if self.dynamic_thumbnails: remote_resp_function = (
await self._select_or_generate_remote_thumbnail( self._select_or_generate_remote_thumbnail
request, server_name, media_id, width, height, method, m_type if self.dynamic_thumbnails
else self._respond_remote_thumbnail
) )
else: await remote_resp_function(
await self._respond_remote_thumbnail( request,
request, server_name, media_id, width, height, method, m_type server_name,
media_id,
width,
height,
method,
m_type,
max_timeout_ms,
) )
self.media_repo.mark_recently_accessed(server_name, media_id) self.media_repo.mark_recently_accessed(server_name, media_id)
@ -113,15 +126,12 @@ class ThumbnailResource(RestServlet):
height: int, height: int,
method: str, method: str,
m_type: str, m_type: str,
max_timeout_ms: int,
) -> None: ) -> None:
media_info = await self.store.get_local_media(media_id) media_info = await self.media_repo.get_local_media_info(
request, media_id, max_timeout_ms
)
if not media_info: if not media_info:
respond_404(request)
return
if media_info.quarantined_by:
logger.info("Media is quarantined")
respond_404(request)
return return
thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
@ -146,15 +156,13 @@ class ThumbnailResource(RestServlet):
desired_height: int, desired_height: int,
desired_method: str, desired_method: str,
desired_type: str, desired_type: str,
max_timeout_ms: int,
) -> None: ) -> None:
media_info = await self.store.get_local_media(media_id) media_info = await self.media_repo.get_local_media_info(
request, media_id, max_timeout_ms
)
if not media_info: if not media_info:
respond_404(request)
return
if media_info.quarantined_by:
logger.info("Media is quarantined")
respond_404(request)
return return
thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
@ -206,8 +214,14 @@ class ThumbnailResource(RestServlet):
desired_height: int, desired_height: int,
desired_method: str, desired_method: str,
desired_type: str, desired_type: str,
max_timeout_ms: int,
) -> None: ) -> None:
media_info = await self.media_repo.get_remote_media_info(server_name, media_id) media_info = await self.media_repo.get_remote_media_info(
server_name, media_id, max_timeout_ms
)
if not media_info:
respond_404(request)
return
thumbnail_infos = await self.store.get_remote_media_thumbnails( thumbnail_infos = await self.store.get_remote_media_thumbnails(
server_name, media_id server_name, media_id
@ -263,11 +277,16 @@ class ThumbnailResource(RestServlet):
height: int, height: int,
method: str, method: str,
m_type: str, m_type: str,
max_timeout_ms: int,
) -> None: ) -> None:
# TODO: Don't download the whole remote file # TODO: Don't download the whole remote file
# We should proxy the thumbnail from the remote server instead of # We should proxy the thumbnail from the remote server instead of
# downloading the remote file and generating our own thumbnails. # downloading the remote file and generating our own thumbnails.
media_info = await self.media_repo.get_remote_media_info(server_name, media_id) media_info = await self.media_repo.get_remote_media_info(
server_name, media_id, max_timeout_ms
)
if not media_info:
return
thumbnail_infos = await self.store.get_remote_media_thumbnails( thumbnail_infos = await self.store.get_remote_media_thumbnails(
server_name, media_id server_name, media_id

View File

@ -15,7 +15,7 @@
import logging import logging
import re import re
from typing import IO, TYPE_CHECKING, Dict, List, Optional from typing import IO, TYPE_CHECKING, Dict, List, Optional, Tuple
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
from synapse.http.server import respond_with_json from synapse.http.server import respond_with_json
@ -29,23 +29,24 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# The name of the lock to use when uploading media.
_UPLOAD_MEDIA_LOCK_NAME = "upload_media"
class UploadResource(RestServlet):
PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")]
class BaseUploadServlet(RestServlet):
def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"):
super().__init__() super().__init__()
self.media_repo = media_repo self.media_repo = media_repo
self.filepaths = media_repo.filepaths self.filepaths = media_repo.filepaths
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.clock = hs.get_clock() self.server_name = hs.hostname
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.max_upload_size = hs.config.media.max_upload_size self.max_upload_size = hs.config.media.max_upload_size
self.clock = hs.get_clock()
async def on_POST(self, request: SynapseRequest) -> None: def _get_file_metadata(
requester = await self.auth.get_user_by_req(request) self, request: SynapseRequest
) -> Tuple[int, Optional[str], str]:
raw_content_length = request.getHeader("Content-Length") raw_content_length = request.getHeader("Content-Length")
if raw_content_length is None: if raw_content_length is None:
raise SynapseError(msg="Request must specify a Content-Length", code=400) raise SynapseError(msg="Request must specify a Content-Length", code=400)
@ -88,6 +89,16 @@ class UploadResource(RestServlet):
# disposition = headers.getRawHeaders(b"Content-Disposition")[0] # disposition = headers.getRawHeaders(b"Content-Disposition")[0]
# TODO(markjh): parse content-dispostion # TODO(markjh): parse content-dispostion
return content_length, upload_name, media_type
class UploadServlet(BaseUploadServlet):
PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload$")]
async def on_POST(self, request: SynapseRequest) -> None:
requester = await self.auth.get_user_by_req(request)
content_length, upload_name, media_type = self._get_file_metadata(request)
try: try:
content: IO = request.content # type: ignore content: IO = request.content # type: ignore
content_uri = await self.media_repo.create_content( content_uri = await self.media_repo.create_content(
@ -103,3 +114,53 @@ class UploadResource(RestServlet):
respond_with_json( respond_with_json(
request, 200, {"content_uri": str(content_uri)}, send_cors=True request, 200, {"content_uri": str(content_uri)}, send_cors=True
) )
class AsyncUploadServlet(BaseUploadServlet):
PATTERNS = [
re.compile(
"/_matrix/media/v3/upload/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$"
)
]
async def on_PUT(
self, request: SynapseRequest, server_name: str, media_id: str
) -> None:
requester = await self.auth.get_user_by_req(request)
if server_name != self.server_name:
raise SynapseError(
404,
"Non-local server name specified",
errcode=Codes.NOT_FOUND,
)
lock = await self.store.try_acquire_lock(_UPLOAD_MEDIA_LOCK_NAME, media_id)
if not lock:
raise SynapseError(
409,
"Media ID cannot be overwritten",
errcode=Codes.CANNOT_OVERWRITE_MEDIA,
)
async with lock:
await self.media_repo.verify_can_upload(media_id, requester.user)
content_length, upload_name, media_type = self._get_file_metadata(request)
try:
content: IO = request.content # type: ignore
await self.media_repo.update_content(
media_id,
media_type,
upload_name,
content,
content_length,
requester.user,
)
except SpamMediaException:
# For uploading of media we want to respond with a 400, instead of
# the default 404, as that would just be confusing.
raise SynapseError(400, "Bad content")
logger.info("Uploaded content for media ID %r", media_id)
respond_with_json(request, 200, {}, send_cors=True)

View File

@ -49,7 +49,11 @@ else:
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -746,10 +750,10 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index. The named index will be dropped upon completion of the new index.
""" """
def create_index_psql(conn: Connection) -> None: def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
conn.rollback() conn.rollback()
# postgres insists on autocommit for the index # postgres insists on autocommit for the index
conn.set_session(autocommit=True) # type: ignore conn.engine.attempt_to_set_autocommit(conn.conn, True)
try: try:
c = conn.cursor() c = conn.cursor()
@ -793,9 +797,9 @@ class BackgroundUpdater:
undo_timeout_sql = f"SET statement_timeout = {default_timeout}" undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql) conn.cursor().execute(undo_timeout_sql)
conn.set_session(autocommit=False) # type: ignore conn.engine.attempt_to_set_autocommit(conn.conn, False)
def create_index_sqlite(conn: Connection) -> None: def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
# Sqlite doesn't support concurrent creation of indexes. # Sqlite doesn't support concurrent creation of indexes.
# #
# We assume that sqlite doesn't give us invalid indices; however # We assume that sqlite doesn't give us invalid indices; however
@ -825,7 +829,9 @@ class BackgroundUpdater:
c.execute(sql) c.execute(sql)
if isinstance(self.db_pool.engine, engines.PostgresEngine): if isinstance(self.db_pool.engine, engines.PostgresEngine):
runner: Optional[Callable[[Connection], None]] = create_index_psql runner: Optional[
Callable[[LoggingDatabaseConnection], None]
] = create_index_psql
elif psql_only: elif psql_only:
runner = None runner = None
else: else:

View File

@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]):
""" """
databases: List[DatabasePool] databases: List[DatabasePool]
main: "DataStore" # FIXME: #11165: actually an instance of `main_store_class` main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class`
state: StateGroupDataStore state: StateGroupDataStore
persist_events: Optional[PersistEventsStore] persist_events: Optional[PersistEventsStore]

View File

@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) )
# Invalidate the cache for any ignored users which were added or removed. # Invalidate the cache for any ignored users which were added or removed.
for ignored_user_id in previously_ignored_users ^ currently_ignored_users: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) txn,
self.ignored_by,
[
(ignored_user_id,)
for ignored_user_id in (
previously_ignored_users ^ currently_ignored_users
)
],
)
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
async def remove_account_data_for_user( async def remove_account_data_for_user(
@ -824,9 +832,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) )
# Invalidate the cache for ignored users which were removed. # Invalidate the cache for ignored users which were removed.
for ignored_user_id in previously_ignored_users: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn,
txn, self.ignored_by, (ignored_user_id,) self.ignored_by,
[
(ignored_user_id,)
for ignored_user_id in previously_ignored_users
],
) )
# Invalidate for this user the cache tracking ignored users. # Invalidate for this user the cache tracking ignored users.

View File

@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id: str, user_id: str,
device_id: Optional[str], device_id: Optional[str],
up_to_stream_id: int, up_to_stream_id: int,
limit: Optional[int] = None,
) -> int: ) -> int:
""" """
Args: Args:
user_id: The recipient user_id. user_id: The recipient user_id.
device_id: The recipient device_id. device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to. up_to_stream_id: Where to delete messages up to.
limit: maximum number of messages to delete
Returns: Returns:
The number of messages deleted. The number of messages deleted.
@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"}) log_kv({"message": "No changes in cache since last check"})
return 0 return 0
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: from_stream_id = None
limit_statement = "" if limit is None else f"LIMIT {limit}" count = 0
sql = f""" while True:
DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( from_stream_id, loop_count = await self.delete_messages_for_device_between(
SELECT MAX(stream_id) FROM ( user_id,
SELECT stream_id FROM device_inbox device_id,
WHERE user_id = ? AND device_id = ? AND stream_id <= ? from_stream_id=from_stream_id,
ORDER BY stream_id to_stream_id=up_to_stream_id,
{limit_statement} limit=1000,
) AS q1
)
"""
txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
return txn.rowcount
count = await self.db_pool.runInteraction(
"delete_messages_for_device", delete_messages_for_device_txn
) )
count += loop_count
if from_stream_id is None:
break
log_kv({"message": f"deleted {count} messages for device", "count": count}) log_kv({"message": f"deleted {count} messages for device", "count": count})
# In this case we don't know if we hit the limit or the delete is complete
# so let's not update the cache.
if count == limit:
return count
# Update the cache, ensuring that we only ever increase the value # Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get( updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0 (user_id, device_id), 0
@ -514,6 +502,74 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count return count
@trace
async def delete_messages_for_device_between(
self,
user_id: str,
device_id: Optional[str],
from_stream_id: Optional[int],
to_stream_id: int,
limit: int,
) -> Tuple[Optional[int], int]:
"""Delete N device messages between the stream IDs, returning the
highest stream ID deleted (or None if all messages in the range have
been deleted) and the number of messages deleted.
This is more efficient than `delete_messages_for_device` when calling in
a loop to batch delete messages.
"""
# Keeping track of a lower bound of stream ID where we've deleted
# everything below makes the queries much faster. Otherwise, every time
# we scan for rows to delete we'd re-scan across all the rows that have
# previously deleted (until the next table VACUUM).
if from_stream_id is None:
# Minimum device stream ID is 1.
from_stream_id = 0
def delete_messages_for_device_between_txn(
txn: LoggingTransaction,
) -> Tuple[Optional[int], int]:
txn.execute(
"""
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
) AS d
""",
(user_id, device_id, from_stream_id, to_stream_id, limit),
)
row = txn.fetchone()
if row is None or row[0] is None:
return None, 0
(max_stream_id,) = row
txn.execute(
"""
DELETE FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
""",
(user_id, device_id, from_stream_id, max_stream_id),
)
num_deleted = txn.rowcount
if num_deleted < limit:
return None, num_deleted
return max_stream_id, num_deleted
return await self.db_pool.runInteraction(
"delete_messages_for_device_between",
delete_messages_for_device_between_txn,
db_autocommit=True, # We don't need to run in a transaction
)
@trace @trace
async def get_new_device_msgs_for_remote( async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int self, destination: str, last_stream_id: int, current_stream_id: int, limit: int

View File

@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return otk_rows return otk_rows
async def get_master_cross_signing_key_updatable_before(
self, user_id: str
) -> Tuple[bool, Optional[int]]:
"""Get time before which a master cross-signing key may be replaced without UIA.
(UIA means "User-Interactive Auth".)
There are three cases to distinguish:
(1) No master cross-signing key.
(2) The key exists, but there is no replace-without-UI timestamp in the DB.
(3) The key exists, and has such a timestamp recorded.
Returns: a 2-tuple of:
- a boolean: is there a master cross-signing key already?
- an optional timestamp, directly taken from the DB.
In terms of the cases above, these are:
(1) (False, None).
(2) (True, None).
(3) (True, <timestamp in ms>).
"""
def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
# We want to distinguish between three cases:
txn.execute(
"""
SELECT updatable_without_uia_before_ms
FROM e2e_cross_signing_keys
WHERE user_id = ? AND keytype = 'master'
ORDER BY stream_id DESC
LIMIT 1
""",
(user_id,),
)
row = cast(Optional[Tuple[Optional[int]]], txn.fetchone())
if row is None:
return False, None
return True, row[0]
return await self.db_pool.runInteraction(
"e2e_cross_signing_keys",
impl,
)
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def __init__( def __init__(
@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
], ],
desc="add_e2e_signing_key", desc="add_e2e_signing_key",
) )
async def allow_master_cross_signing_key_replacement_without_uia(
self, user_id: str, duration_ms: int
) -> Optional[int]:
"""Mark this user's latest master key as being replaceable without UIA.
Said replacement will only be permitted for a short time after calling this
function. That time period is controlled by the duration argument.
Returns:
None, if there is no such key.
Otherwise, the timestamp before which replacement is allowed without UIA.
"""
timestamp = self._clock.time_msec() + duration_ms
def impl(txn: LoggingTransaction) -> Optional[int]:
txn.execute(
"""
UPDATE e2e_cross_signing_keys
SET updatable_without_uia_before_ms = ?
WHERE stream_id = (
SELECT stream_id
FROM e2e_cross_signing_keys
WHERE user_id = ? AND keytype = 'master'
ORDER BY stream_id DESC
LIMIT 1
)
""",
(timestamp, user_id),
)
if txn.rowcount == 0:
return None
return timestamp
return await self.db_pool.runInteraction(
"allow_master_cross_signing_key_replacement_without_uia",
impl,
)

View File

@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
"""Background update to clean out extremities that should have been """Background update to clean out extremities that should have been
deleted previously. deleted previously.
Mainly used to deal with the aftermath of #5269. Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269.
""" """
# This works by first copying all existing forward extremities into the # This works by first copying all existing forward extremities into the
@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
) )
logger.info( logger.info(
"Deleted %d forward extremities of %d checked, to clean up #5269", "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269",
deleted, deleted,
len(original_set), len(original_set),
) )
@ -1222,13 +1222,12 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
) )
# Iterate the parent IDs and invalidate caches. # Iterate the parent IDs and invalidate caches.
for parent_id in {r[1] for r in relations_to_insert}: cache_tuples = {(r[1],) for r in relations_to_insert}
cache_tuple = (parent_id,) self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
self._invalidate_cache_and_stream( # type: ignore[attr-defined] txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
) )
self._invalidate_cache_and_stream( # type: ignore[attr-defined] self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
) )
if results: if results:

View File

@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore):
room_version: Optional[RoomVersion] room_version: Optional[RoomVersion]
if not room_version_id: if not room_version_id:
# this should only happen for out-of-band membership events which # this should only happen for out-of-band membership events which
# arrived before #6983 landed. For all other events, we should have # arrived before https://github.com/matrix-org/synapse/issues/6983
# landed. For all other events, we should have
# an entry in the 'rooms' table. # an entry in the 'rooms' table.
# #
# However, the 'out_of_band_membership' flag is unreliable for older # However, the 'out_of_band_membership' flag is unreliable for older
@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore):
"Room %s for event %s is unknown" % (d["room_id"], event_id) "Room %s for event %s is unknown" % (d["room_id"], event_id)
) )
# so, assuming this is an out-of-band-invite that arrived before #6983 # so, assuming this is an out-of-band-invite that arrived before
# https://github.com/matrix-org/synapse/issues/6983
# landed, we know that the room version must be v5 or earlier (because # landed, we know that the room version must be v5 or earlier (because
# v6 hadn't been invented at that point, so invites from such rooms # v6 hadn't been invented at that point, so invites from such rooms
# would have been rejected.) # would have been rejected.)

View File

@ -107,12 +107,15 @@ class KeyStore(CacheInvalidationWorkerStore):
# invalidate takes a tuple corresponding to the params of # invalidate takes a tuple corresponding to the params of
# _get_server_keys_json. _get_server_keys_json only takes one # _get_server_keys_json. _get_server_keys_json only takes one
# param, which is itself the 2-tuple (server_name, key_id). # param, which is itself the 2-tuple (server_name, key_id).
for key_id in verify_keys: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn,
txn, self._get_server_keys_json, ((server_name, key_id),) self._get_server_keys_json,
[((server_name, key_id),) for key_id in verify_keys],
) )
self._invalidate_cache_and_stream( self._invalidate_cache_and_stream_bulk(
txn, self.get_server_key_json_for_remote, (server_name, key_id) txn,
self.get_server_key_json_for_remote,
[(server_name, key_id) for key_id in verify_keys],
) )
await self.db_pool.runInteraction( await self.db_pool.runInteraction(

View File

@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
class LocalMedia: class LocalMedia:
media_id: str media_id: str
media_type: str media_type: str
media_length: int media_length: Optional[int]
upload_name: str upload_name: str
created_ts: int created_ts: int
url_cache: Optional[str] url_cache: Optional[str]
last_access_ts: int last_access_ts: int
quarantined_by: Optional[str] quarantined_by: Optional[str]
safe_from_quarantine: bool safe_from_quarantine: bool
user_id: Optional[str]
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
self._drop_media_index_without_method, self._drop_media_index_without_method,
) )
if hs.config.media.can_load_media_repo:
self.unused_expiration_time: Optional[
int
] = hs.config.media.unused_expiration_time
else:
self.unused_expiration_time = None
async def _drop_media_index_without_method( async def _drop_media_index_without_method(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int
) -> int: ) -> int:
@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"url_cache", "url_cache",
"last_access_ts", "last_access_ts",
"safe_from_quarantine", "safe_from_quarantine",
"user_id",
), ),
allow_none=True, allow_none=True,
desc="get_local_media", desc="get_local_media",
@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
url_cache=row[5], url_cache=row[5],
last_access_ts=row[6], last_access_ts=row[6],
safe_from_quarantine=row[7], safe_from_quarantine=row[7],
user_id=row[8],
) )
async def get_local_media_by_user_paginate( async def get_local_media_by_user_paginate(
@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
url_cache, url_cache,
last_access_ts, last_access_ts,
quarantined_by, quarantined_by,
safe_from_quarantine safe_from_quarantine,
user_id
FROM local_media_repository FROM local_media_repository
WHERE user_id = ? WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC ORDER BY {order_by_column} {order}, media_id ASC
@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
last_access_ts=row[6], last_access_ts=row[6],
quarantined_by=row[7], quarantined_by=row[7],
safe_from_quarantine=bool(row[8]), safe_from_quarantine=bool(row[8]),
user_id=row[9],
) )
for row in txn for row in txn
] ]
@ -391,6 +403,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"get_local_media_ids", _get_local_media_ids_txn "get_local_media_ids", _get_local_media_ids_txn
) )
@trace
async def store_local_media_id(
self,
media_id: str,
time_now_ms: int,
user_id: UserID,
) -> None:
await self.db_pool.simple_insert(
"local_media_repository",
{
"media_id": media_id,
"created_ts": time_now_ms,
"user_id": user_id.to_string(),
},
desc="store_local_media_id",
)
@trace @trace
async def store_local_media( async def store_local_media(
self, self,
@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media", desc="store_local_media",
) )
async def update_local_media(
self,
media_id: str,
media_type: str,
upload_name: Optional[str],
media_length: int,
user_id: UserID,
url_cache: Optional[str] = None,
) -> None:
await self.db_pool.simple_update_one(
"local_media_repository",
keyvalues={
"user_id": user_id.to_string(),
"media_id": media_id,
},
updatevalues={
"media_type": media_type,
"upload_name": upload_name,
"media_length": media_length,
"url_cache": url_cache,
},
desc="update_local_media",
)
async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
"""Mark a local media as safe or unsafe from quarantining.""" """Mark a local media as safe or unsafe from quarantining."""
await self.db_pool.simple_update_one( await self.db_pool.simple_update_one(
@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="mark_local_media_as_safe", desc="mark_local_media_as_safe",
) )
async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]:
"""Count the number of pending media for a user.
Returns:
A tuple of two integers: the total pending media requests and the earliest
expiration timestamp.
"""
def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]:
sql = """
SELECT COUNT(*), MIN(created_ts)
FROM local_media_repository
WHERE user_id = ?
AND created_ts > ?
AND media_length IS NULL
"""
assert self.unused_expiration_time is not None
txn.execute(
sql,
(
user_id.to_string(),
self._clock.time_msec() - self.unused_expiration_time,
),
)
row = txn.fetchone()
if not row:
return 0, 0
return row[0], (row[1] + self.unused_expiration_time if row[1] else 0)
return await self.db_pool.runInteraction(
"get_pending_media", get_pending_media_txn
)
async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]: async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]:
"""Get the media_id and ts for a cached URL as of the given timestamp """Get the media_id and ts for a cached URL as of the given timestamp
Returns: Returns:

View File

@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
if user_id: if user_id:
is_support = self.is_support_user_txn(txn, user_id) is_support = self.is_support_user_txn(txn, user_id)
if not is_support: if not is_support:
# We do this manually here to avoid hitting #6791 # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="monthly_active_users", table="monthly_active_users",

View File

@ -363,9 +363,10 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
# for their user ID. # for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids], value_values=[(presence_stream_id,) for _ in user_ids],
) )
for user_id in user_ids: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn,
txn, self._get_full_presence_stream_token_for_user, (user_id,) self._get_full_presence_stream_token_for_user,
[(user_id,) for user_id in user_ids],
) )
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(

View File

@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so make sure to keep this actually last. # so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge") txn.execute("DROP TABLE events_to_purge")
for event_id, should_delete in event_rows: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn,
txn, self._get_state_group_for_event, (event_id,) self._get_state_group_for_event,
[(event_id,) for event_id, _ in event_rows],
) )
# XXX: This is racy, since have_seen_events could be called between the # XXX: This is racy, since have_seen_events could be called between the
# transaction completing and the invalidation running. On the other hand, # transaction completing and the invalidation running. On the other hand,
# that's no different to calling `have_seen_events` just before the # that's no different to calling `have_seen_events` just before the
# event is deleted from the database. # event is deleted from the database.
if should_delete: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn,
txn, self.have_seen_event, (room_id, event_id) self.have_seen_event,
[
(room_id, event_id)
for event_id, should_delete in event_rows
if should_delete
],
) )
for event_id, should_delete in event_rows:
if should_delete:
self.invalidate_get_event_cache_after_txn(txn, event_id) self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done") logger.info("[purge] done")
@ -485,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# - room_tags_revisions # - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id # The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables # index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888) # periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
self._invalidate_caches_for_room_and_stream(txn, room_id) self._invalidate_caches_for_room_and_stream(txn, room_id)

View File

@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore):
before: str, before: str,
after: str, after: str,
) -> None: ) -> None:
# Lock the table since otherwise we'll have annoying races between the
# SELECT here and the UPSERT below.
self.database_engine.lock_table(txn, "push_rules")
relative_to_rule = before or after relative_to_rule = before or after
res = self.db_pool.simple_select_one_txn( sql = """
txn, SELECT priority, priority_class FROM push_rules
table="push_rules", WHERE user_name = ? AND rule_id = ?
keyvalues={"user_name": user_id, "rule_id": relative_to_rule}, """
retcols=["priority_class", "priority"],
allow_none=True,
)
if not res: if isinstance(self.database_engine, PostgresEngine):
sql += " FOR UPDATE"
else:
# Annoyingly SQLite doesn't support row level locking, so lock the whole table
self.database_engine.lock_table(txn, "push_rules")
txn.execute(sql, (user_id, relative_to_rule))
row = txn.fetchone()
if row is None:
raise RuleNotFoundException( raise RuleNotFoundException(
"before/after rule not found: %s" % (relative_to_rule,) "before/after rule not found: %s" % (relative_to_rule,)
) )
base_priority_class, base_rule_priority = res base_rule_priority, base_priority_class = row
if base_priority_class != priority_class: if base_priority_class != priority_class:
raise InconsistentRuleException( raise InconsistentRuleException(
@ -516,8 +518,17 @@ class PushRuleStore(PushRulesWorkerStore):
conditions_json: str, conditions_json: str,
actions_json: str, actions_json: str,
) -> None: ) -> None:
# Lock the table since otherwise we'll have annoying races between the if isinstance(self.database_engine, PostgresEngine):
# SELECT here and the UPSERT below. # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first
# then re-select the count/max below.
sql = """
SELECT * FROM push_rules
WHERE user_name = ? and priority_class = ?
FOR UPDATE
"""
txn.execute(sql, (user_id, priority_class))
else:
# Annoyingly SQLite doesn't support row level locking, so lock the whole table
self.database_engine.lock_table(txn, "push_rules") self.database_engine.lock_table(txn, "push_rules")
# find the highest priority rule in that class # find the highest priority rule in that class

View File

@ -561,15 +561,14 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
updatevalues={"shadow_banned": shadow_banned}, updatevalues={"shadow_banned": shadow_banned},
) )
# In order for this to apply immediately, clear the cache for this user. # In order for this to apply immediately, clear the cache for this user.
tokens = self.db_pool.simple_select_onecol_txn( tokens = self.db_pool.simple_select_list_txn(
txn, txn,
table="access_tokens", table="access_tokens",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
retcol="token", retcols=("token",),
) )
for token in tokens: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn, self.get_user_by_access_token, tokens
txn, self.get_user_by_access_token, (token,)
) )
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@ -2683,9 +2682,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
) )
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
for token, _, _ in tokens_and_devices: self._invalidate_cache_and_stream_bulk(
self._invalidate_cache_and_stream( txn,
txn, self.get_user_by_access_token, (token,) self.get_user_by_access_token,
[(token,) for token, _, _ in tokens_and_devices],
) )
txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values) txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)

View File

@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we have to set autocommit, because postgres refuses to # we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it. # CREATE INDEX CONCURRENTLY without it.
conn.set_session(autocommit=True) conn.engine.attempt_to_set_autocommit(conn.conn, True)
try: try:
c = conn.cursor() c = conn.cursor()
@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we should now be able to delete the GIST index. # we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally: finally:
conn.set_session(autocommit=False) conn.engine.attempt_to_set_autocommit(conn.conn, False)
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index) await self.db_pool.runWithConnection(create_index)
@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
def create_index(conn: LoggingDatabaseConnection) -> None: def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback() conn.rollback()
conn.set_session(autocommit=True) conn.engine.attempt_to_set_autocommit(conn.conn, True)
c = conn.cursor() c = conn.cursor()
# We create with NULLS FIRST so that when we search *backwards* # We create with NULLS FIRST so that when we search *backwards*
@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
""" """
) )
conn.set_session(autocommit=False) conn.engine.attempt_to_set_autocommit(conn.conn, False)
await self.db_pool.runWithConnection(create_index) await self.db_pool.runWithConnection(create_index)

View File

@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
conn.rollback() conn.rollback()
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index # postgres insists on autocommit for the index
conn.set_session(autocommit=True) conn.engine.attempt_to_set_autocommit(conn.conn, True)
try: try:
txn = conn.cursor() txn = conn.cursor()
txn.execute( txn.execute(
@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
) )
txn.execute("DROP INDEX IF EXISTS state_groups_state_id") txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally: finally:
conn.set_session(autocommit=False) conn.engine.attempt_to_set_autocommit(conn.conn, False)
else: else:
txn = conn.cursor() txn = conn.cursor()
txn.execute( txn.execute(

View File

@ -38,7 +38,8 @@ class PostgresEngine(
super().__init__(psycopg2, database_config) super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
# Disables passing `bytes` to txn.execute, c.f. #6186. If you do # Disables passing `bytes` to txn.execute, c.f.
# https://github.com/matrix-org/synapse/issues/6186. If you do
# actually want to use bytes than wrap it in `bytearray`. # actually want to use bytes than wrap it in `bytearray`.
def _disable_bytes_adapter(_: bytes) -> NoReturn: def _disable_bytes_adapter(_: bytes) -> NoReturn:
raise Exception("Passing bytes to DB is disabled.") raise Exception("Passing bytes to DB is disabled.")

View File

@ -109,7 +109,8 @@ Changes in SCHEMA_VERSION = 78
Changes in SCHEMA_VERSION = 79 Changes in SCHEMA_VERSION = 79
- Add tables to handle in DB read-write locks. - Add tables to handle in DB read-write locks.
- Add some mitigations for a painful race between foreground and background updates, cf #15677. - Add some mitigations for a painful race between foreground and background updates, cf
https://github.com/matrix-org/synapse/issues/15677.
Changes in SCHEMA_VERSION = 80 Changes in SCHEMA_VERSION = 80
- The event_txn_id_device_id is always written to for new events. - The event_txn_id_device_id is always written to for new events.

View File

@ -0,0 +1,30 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- Re-use unique indices already defined on tables as a replica identity.
ALTER TABLE applied_module_schemas REPLICA IDENTITY USING INDEX applied_module_schemas_module_name_file_key;
ALTER TABLE applied_schema_deltas REPLICA IDENTITY USING INDEX applied_schema_deltas_version_file_key;
ALTER TABLE background_updates REPLICA IDENTITY USING INDEX background_updates_uniqueness;
ALTER TABLE schema_compat_version REPLICA IDENTITY USING INDEX schema_compat_version_lock_key;
ALTER TABLE schema_version REPLICA IDENTITY USING INDEX schema_version_lock_key;

View File

@ -14,7 +14,7 @@
*/ */
-- Start a background job to cleanup extremities that were incorrectly added -- Start a background job to cleanup extremities that were incorrectly added
-- by bug #5269. -- by bug https://github.com/matrix-org/synapse/issues/5269.
INSERT INTO background_updates (update_name, progress_json) VALUES INSERT INTO background_updates (update_name, progress_json) VALUES
('delete_soft_failed_extremities', '{}'); ('delete_soft_failed_extremities', '{}');

View File

@ -13,6 +13,7 @@
* limitations under the License. * limitations under the License.
*/ */
-- Now that #6232 is a thing, we can remove old rooms from the directory. -- Now that https://github.com/matrix-org/synapse/pull/6232 is a thing, we can
-- remove old rooms from the directory.
INSERT INTO background_updates (update_name, progress_json) VALUES INSERT INTO background_updates (update_name, progress_json) VALUES
('remove_tombstoned_rooms_from_directory', '{}'); ('remove_tombstoned_rooms_from_directory', '{}');

View File

@ -13,7 +13,8 @@
* limitations under the License. * limitations under the License.
*/ */
-- Clean up left over rows from bug #11833, which was fixed in #12770. -- Clean up left over rows from bug https://github.com/matrix-org/synapse/issues/11833,
-- which was fixed in https://github.com/matrix-org/synapse/pull/12770.
DELETE FROM federation_inbound_events_staging WHERE room_id not in ( DELETE FROM federation_inbound_events_staging WHERE room_id not in (
SELECT room_id FROM rooms SELECT room_id FROM rooms
); );

View File

@ -0,0 +1,88 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- Where possible, re-use unique indices already defined on tables as a replica
-- identity.
ALTER TABLE appservice_room_list REPLICA IDENTITY USING INDEX appservice_room_list_idx;
ALTER TABLE batch_events REPLICA IDENTITY USING INDEX chunk_events_event_id;
ALTER TABLE blocked_rooms REPLICA IDENTITY USING INDEX blocked_rooms_idx;
ALTER TABLE cache_invalidation_stream_by_instance REPLICA IDENTITY USING INDEX cache_invalidation_stream_by_instance_id;
ALTER TABLE device_lists_changes_in_room REPLICA IDENTITY USING INDEX device_lists_changes_in_stream_id;
ALTER TABLE device_lists_outbound_last_success REPLICA IDENTITY USING INDEX device_lists_outbound_last_success_unique_idx;
ALTER TABLE device_lists_remote_cache REPLICA IDENTITY USING INDEX device_lists_remote_cache_unique_id;
ALTER TABLE device_lists_remote_extremeties REPLICA IDENTITY USING INDEX device_lists_remote_extremeties_unique_idx;
ALTER TABLE device_lists_remote_resync REPLICA IDENTITY USING INDEX device_lists_remote_resync_idx;
ALTER TABLE e2e_cross_signing_keys REPLICA IDENTITY USING INDEX e2e_cross_signing_keys_stream_idx;
ALTER TABLE e2e_room_keys REPLICA IDENTITY USING INDEX e2e_room_keys_with_version_idx;
ALTER TABLE e2e_room_keys_versions REPLICA IDENTITY USING INDEX e2e_room_keys_versions_idx;
ALTER TABLE erased_users REPLICA IDENTITY USING INDEX erased_users_user;
ALTER TABLE event_relations REPLICA IDENTITY USING INDEX event_relations_id;
ALTER TABLE federation_inbound_events_staging REPLICA IDENTITY USING INDEX federation_inbound_events_staging_instance_event;
ALTER TABLE federation_stream_position REPLICA IDENTITY USING INDEX federation_stream_position_instance;
ALTER TABLE ignored_users REPLICA IDENTITY USING INDEX ignored_users_uniqueness;
ALTER TABLE insertion_events REPLICA IDENTITY USING INDEX insertion_events_event_id;
ALTER TABLE insertion_event_extremities REPLICA IDENTITY USING INDEX insertion_event_extremities_event_id;
ALTER TABLE monthly_active_users REPLICA IDENTITY USING INDEX monthly_active_users_users;
ALTER TABLE ratelimit_override REPLICA IDENTITY USING INDEX ratelimit_override_idx;
ALTER TABLE room_stats_earliest_token REPLICA IDENTITY USING INDEX room_stats_earliest_token_idx;
ALTER TABLE room_stats_state REPLICA IDENTITY USING INDEX room_stats_state_room;
ALTER TABLE stream_positions REPLICA IDENTITY USING INDEX stream_positions_idx;
ALTER TABLE user_directory REPLICA IDENTITY USING INDEX user_directory_user_idx;
ALTER TABLE user_directory_search REPLICA IDENTITY USING INDEX user_directory_search_user_idx;
ALTER TABLE user_ips REPLICA IDENTITY USING INDEX user_ips_user_token_ip_unique_index;
ALTER TABLE user_signature_stream REPLICA IDENTITY USING INDEX user_signature_stream_idx;
ALTER TABLE users_in_public_rooms REPLICA IDENTITY USING INDEX users_in_public_rooms_u_idx;
ALTER TABLE users_who_share_private_rooms REPLICA IDENTITY USING INDEX users_who_share_private_rooms_u_idx;
ALTER TABLE user_threepid_id_server REPLICA IDENTITY USING INDEX user_threepid_id_server_idx;
ALTER TABLE worker_locks REPLICA IDENTITY USING INDEX worker_locks_key;
-- Where there are no unique indices, use the entire rows as replica identities.
ALTER TABLE current_state_delta_stream REPLICA IDENTITY FULL;
ALTER TABLE deleted_pushers REPLICA IDENTITY FULL;
ALTER TABLE device_auth_providers REPLICA IDENTITY FULL;
ALTER TABLE device_federation_inbox REPLICA IDENTITY FULL;
ALTER TABLE device_federation_outbox REPLICA IDENTITY FULL;
ALTER TABLE device_inbox REPLICA IDENTITY FULL;
ALTER TABLE device_lists_outbound_pokes REPLICA IDENTITY FULL;
ALTER TABLE device_lists_stream REPLICA IDENTITY FULL;
ALTER TABLE e2e_cross_signing_signatures REPLICA IDENTITY FULL;
ALTER TABLE event_auth_chain_links REPLICA IDENTITY FULL;
ALTER TABLE event_auth REPLICA IDENTITY FULL;
ALTER TABLE event_push_actions_staging REPLICA IDENTITY FULL;
ALTER TABLE insertion_event_edges REPLICA IDENTITY FULL;
ALTER TABLE local_media_repository_url_cache REPLICA IDENTITY FULL;
ALTER TABLE presence_stream REPLICA IDENTITY FULL;
ALTER TABLE push_rules_stream REPLICA IDENTITY FULL;
ALTER TABLE room_alias_servers REPLICA IDENTITY FULL;
ALTER TABLE stream_ordering_to_exterm REPLICA IDENTITY FULL;
ALTER TABLE timeline_gaps REPLICA IDENTITY FULL;
ALTER TABLE user_daily_visits REPLICA IDENTITY FULL;
ALTER TABLE users_pending_deactivation REPLICA IDENTITY FULL;
-- special cases: unique indices on nullable columns can't be used
ALTER TABLE event_push_summary REPLICA IDENTITY FULL;
ALTER TABLE event_search REPLICA IDENTITY FULL;
ALTER TABLE local_media_repository_thumbnails REPLICA IDENTITY FULL;
ALTER TABLE remote_media_cache_thumbnails REPLICA IDENTITY FULL;
ALTER TABLE threepid_guest_access_tokens REPLICA IDENTITY FULL;
ALTER TABLE user_filters REPLICA IDENTITY FULL; -- sadly the `CHECK` constraint is not enough here

View File

@ -0,0 +1,15 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL;

View File

@ -0,0 +1,80 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- Where possible, re-use unique indices already defined on tables as a replica
-- identity.
ALTER TABLE account_data REPLICA IDENTITY USING INDEX account_data_uniqueness;
ALTER TABLE application_services_txns REPLICA IDENTITY USING INDEX application_services_txns_as_id_txn_id_key;
ALTER TABLE appservice_stream_position REPLICA IDENTITY USING INDEX appservice_stream_position_lock_key;
ALTER TABLE current_state_events REPLICA IDENTITY USING INDEX current_state_events_event_id_key;
ALTER TABLE device_lists_changes_converted_stream_position REPLICA IDENTITY USING INDEX device_lists_changes_converted_stream_position_lock_key;
ALTER TABLE devices REPLICA IDENTITY USING INDEX device_uniqueness;
ALTER TABLE e2e_device_keys_json REPLICA IDENTITY USING INDEX e2e_device_keys_json_uniqueness;
ALTER TABLE e2e_fallback_keys_json REPLICA IDENTITY USING INDEX e2e_fallback_keys_json_uniqueness;
ALTER TABLE e2e_one_time_keys_json REPLICA IDENTITY USING INDEX e2e_one_time_keys_json_uniqueness;
ALTER TABLE event_backward_extremities REPLICA IDENTITY USING INDEX event_backward_extremities_event_id_room_id_key;
ALTER TABLE event_edges REPLICA IDENTITY USING INDEX event_edges_event_id_prev_event_id_idx;
ALTER TABLE event_forward_extremities REPLICA IDENTITY USING INDEX event_forward_extremities_event_id_room_id_key;
ALTER TABLE event_json REPLICA IDENTITY USING INDEX event_json_event_id_key;
ALTER TABLE event_push_summary_last_receipt_stream_id REPLICA IDENTITY USING INDEX event_push_summary_last_receipt_stream_id_lock_key;
ALTER TABLE event_push_summary_stream_ordering REPLICA IDENTITY USING INDEX event_push_summary_stream_ordering_lock_key;
ALTER TABLE events REPLICA IDENTITY USING INDEX events_event_id_key;
ALTER TABLE event_to_state_groups REPLICA IDENTITY USING INDEX event_to_state_groups_event_id_key;
ALTER TABLE event_txn_id_device_id REPLICA IDENTITY USING INDEX event_txn_id_device_id_event_id;
ALTER TABLE event_txn_id REPLICA IDENTITY USING INDEX event_txn_id_event_id;
ALTER TABLE local_current_membership REPLICA IDENTITY USING INDEX local_current_membership_idx;
ALTER TABLE partial_state_events REPLICA IDENTITY USING INDEX partial_state_events_event_id_key;
ALTER TABLE partial_state_rooms_servers REPLICA IDENTITY USING INDEX partial_state_rooms_servers_room_id_server_name_key;
ALTER TABLE profiles REPLICA IDENTITY USING INDEX profiles_user_id_key;
ALTER TABLE redactions REPLICA IDENTITY USING INDEX redactions_event_id_key;
ALTER TABLE registration_tokens REPLICA IDENTITY USING INDEX registration_tokens_token_key;
ALTER TABLE rejections REPLICA IDENTITY USING INDEX rejections_event_id_key;
ALTER TABLE room_account_data REPLICA IDENTITY USING INDEX room_account_data_uniqueness;
ALTER TABLE room_aliases REPLICA IDENTITY USING INDEX room_aliases_room_alias_key;
ALTER TABLE room_depth REPLICA IDENTITY USING INDEX room_depth_room_id_key;
ALTER TABLE room_forgetter_stream_pos REPLICA IDENTITY USING INDEX room_forgetter_stream_pos_lock_key;
ALTER TABLE room_memberships REPLICA IDENTITY USING INDEX room_memberships_event_id_key;
ALTER TABLE room_tags REPLICA IDENTITY USING INDEX room_tag_uniqueness;
ALTER TABLE room_tags_revisions REPLICA IDENTITY USING INDEX room_tag_revisions_uniqueness;
ALTER TABLE server_keys_json REPLICA IDENTITY USING INDEX server_keys_json_uniqueness;
ALTER TABLE sessions REPLICA IDENTITY USING INDEX sessions_session_type_session_id_key;
ALTER TABLE state_events REPLICA IDENTITY USING INDEX state_events_event_id_key;
ALTER TABLE stats_incremental_position REPLICA IDENTITY USING INDEX stats_incremental_position_lock_key;
ALTER TABLE threads REPLICA IDENTITY USING INDEX threads_uniqueness;
ALTER TABLE ui_auth_sessions_credentials REPLICA IDENTITY USING INDEX ui_auth_sessions_credentials_session_id_stage_type_key;
ALTER TABLE ui_auth_sessions_ips REPLICA IDENTITY USING INDEX ui_auth_sessions_ips_session_id_ip_user_agent_key;
ALTER TABLE ui_auth_sessions REPLICA IDENTITY USING INDEX ui_auth_sessions_session_id_key;
ALTER TABLE user_directory_stream_pos REPLICA IDENTITY USING INDEX user_directory_stream_pos_lock_key;
ALTER TABLE user_external_ids REPLICA IDENTITY USING INDEX user_external_ids_auth_provider_external_id_key;
ALTER TABLE user_threepids REPLICA IDENTITY USING INDEX medium_address;
ALTER TABLE worker_read_write_locks_mode REPLICA IDENTITY USING INDEX worker_read_write_locks_mode_key;
ALTER TABLE worker_read_write_locks REPLICA IDENTITY USING INDEX worker_read_write_locks_key;
-- special cases: unique indices on nullable columns can't be used
ALTER TABLE event_push_actions REPLICA IDENTITY FULL;
ALTER TABLE local_media_repository REPLICA IDENTITY FULL;
ALTER TABLE receipts_graph REPLICA IDENTITY FULL;
ALTER TABLE receipts_linearized REPLICA IDENTITY FULL;
ALTER TABLE received_transactions REPLICA IDENTITY FULL;
ALTER TABLE remote_media_cache REPLICA IDENTITY FULL;
ALTER TABLE server_signature_keys REPLICA IDENTITY FULL;
ALTER TABLE users REPLICA IDENTITY FULL;

View File

@ -0,0 +1,30 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- See also: 82/04_replica_identities.sql.postgres on the main database
-- Where possible, re-use unique indices already defined on tables as a replica
-- identity.
ALTER TABLE state_group_edges REPLICA IDENTITY USING INDEX state_group_edges_unique_idx;
-- Where there are no unique indices, use the entire rows as replica identities.
ALTER TABLE state_groups_state REPLICA IDENTITY FULL;

View File

@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None:
errors.append(_not_installed(requirement, extra)) errors.append(_not_installed(requirement, extra))
else: else:
if dist.version is None: if dist.version is None:
# This shouldn't happen---it suggests a borked virtualenv. (See #12223) # This shouldn't happen---it suggests a borked virtualenv. (See
# https://github.com/matrix-org/synapse/issues/12223)
# Try to give a vaguely helpful error message anyway. # Try to give a vaguely helpful error message anyway.
# Type-ignore: the annotations don't reflect reality: see # Type-ignore: the annotations don't reflect reality: see
# https://github.com/python/typeshed/issues/7513 # https://github.com/python/typeshed/issues/7513

View File

@ -135,3 +135,54 @@ def sorted_topologically(
degree_map[edge] -= 1 degree_map[edge] -= 1
if degree_map[edge] == 0: if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge) heapq.heappush(zero_degree, edge)
def sorted_topologically_batched(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> Generator[Collection[T], None, None]:
r"""Walk the graph topologically, returning batches of nodes where all nodes
that references it have been previously returned.
For example, given the following graph:
A
/ \
B C
\ /
D
This function will return: `[[A], [B, C], [D]]`.
This function is useful for e.g. batch persisting events in an auth chain,
where we can only persist an event if all its auth events have already been
persisted.
"""
degree_map = {node: 0 for node in nodes}
reverse_graph: Dict[T, Set[T]] = {}
for node, edges in graph.items():
if node not in degree_map:
continue
for edge in set(edges):
if edge in degree_map:
degree_map[node] += 1
reverse_graph.setdefault(edge, set()).add(node)
reverse_graph.setdefault(node, set())
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
while zero_degree:
new_zero_degree = []
for node in zero_degree:
for edge in reverse_graph.get(node, []):
if edge in degree_map:
degree_map[edge] -= 1
if degree_map[edge] == 0:
new_zero_degree.append(edge)
yield zero_degree
zero_degree = new_zero_degree

View File

@ -193,7 +193,7 @@ class TaskScheduler:
result: Optional[JsonMapping] = None, result: Optional[JsonMapping] = None,
error: Optional[str] = None, error: Optional[str] = None,
) -> bool: ) -> bool:
"""Update some task associated values. This is exposed publically so it can """Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse. resume a task at a specific step after a restart of synapse.

View File

@ -29,5 +29,5 @@ We can't peek into rooms with joined history_visibility
Local users can peek by room alias Local users can peek by room alias
Peeked rooms only turn up in the sync for the device who peeked them Peeked rooms only turn up in the sync for the device who peeked them
# Validation needs to be added to Synapse: #10554 # Validation needs to be added to Synapse: https://github.com/matrix-org/synapse/issues/10554
Rejects invalid device keys Rejects invalid device keys

View File

@ -478,7 +478,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# expect two edus, in one or two transactions. We don't know what order the # expect two edus, in one or two transactions. We don't know what order the
# devices will be updated. # devices will be updated.
self.assertEqual(len(self.edus), 2) self.assertEqual(len(self.edus), 2)
stream_id = None # FIXME: there is a discontinuity in the stream IDs: see #7142 stream_id = None # FIXME: there is a discontinuity in the stream IDs: see https://github.com/matrix-org/synapse/issues/7142
for edu in self.edus: for edu in self.edus:
self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE) self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
c = edu["content"] c = edu["content"]

View File

@ -1602,3 +1602,50 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
} }
}, },
) )
def test_check_cross_signing_setup(self) -> None:
# First check what happens with no master key.
alice = "@alice:test"
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, False)
self.assertIs(replaceable_without_uia, False)
# Upload a master key but don't specify a replacement timestamp.
dummy_key = {"keys": {"a": "b"}}
self.get_success(
self.store.set_e2e_cross_signing_key("@alice:test", "master", dummy_key)
)
# Should now find the key exists.
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, True)
self.assertIs(replaceable_without_uia, False)
# Set an expiry timestamp in the future.
self.get_success(
self.store.allow_master_cross_signing_key_replacement_without_uia(
alice,
1000,
)
)
# Should now be allowed to replace the key without UIA.
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, True)
self.assertIs(replaceable_without_uia, True)
# Wait 2 seconds, so that the timestamp is in the past.
self.reactor.advance(2.0)
# Should no longer be allowed to replace the key without UIA.
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, True)
self.assertIs(replaceable_without_uia, False)

View File

@ -112,7 +112,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
""" """
Check that we store the state group correctly for rejected non-state events. Check that we store the state group correctly for rejected non-state events.
Regression test for #6289. Regression test for https://github.com/matrix-org/synapse/issues/6289.
""" """
OTHER_SERVER = "otherserver" OTHER_SERVER = "otherserver"
OTHER_USER = "@otheruser:" + OTHER_SERVER OTHER_USER = "@otheruser:" + OTHER_SERVER
@ -165,7 +165,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
""" """
Check that we store the state group correctly for rejected state events. Check that we store the state group correctly for rejected state events.
Regression test for #6289. Regression test for https://github.com/matrix-org/synapse/issues/6289.
""" """
OTHER_SERVER = "otherserver" OTHER_SERVER = "otherserver"
OTHER_USER = "@otheruser:" + OTHER_SERVER OTHER_USER = "@otheruser:" + OTHER_SERVER
@ -222,7 +222,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
of backwards extremities(the magic number is more than 5), of backwards extremities(the magic number is more than 5),
no errors are thrown. no errors are thrown.
Regression test, see #11027 Regression test, see https://github.com/matrix-org/synapse/pull/11027
""" """
# create the room # create the room
user_id = self.register_user("kermit", "test") user_id = self.register_user("kermit", "test")

View File

@ -368,7 +368,8 @@ class FederationClientTests(HomeserverTestCase):
""" """
If a connection is made to a client but the client rejects it due to If a connection is made to a client but the client rejects it due to
requiring a trailing slash. We need to retry the request with a requiring a trailing slash. We need to retry the request with a
trailing slash. Workaround for Synapse <= v0.99.3, explained in #3622. trailing slash. Workaround for Synapse <= v0.99.3, explained in
https://github.com/matrix-org/synapse/issues/3622.
""" """
d = defer.ensureDeferred( d = defer.ensureDeferred(
self.cl.get_json("testserv:8008", "foo/bar", try_trailing_slash_on_400=True) self.cl.get_json("testserv:8008", "foo/bar", try_trailing_slash_on_400=True)

View File

@ -318,7 +318,9 @@ class MediaRepoTests(unittest.HomeserverTestCase):
self.assertEqual( self.assertEqual(
self.fetches[0][2], "/_matrix/media/r0/download/" + self.media_id self.fetches[0][2], "/_matrix/media/r0/download/" + self.media_id
) )
self.assertEqual(self.fetches[0][3], {"allow_remote": "false"}) self.assertEqual(
self.fetches[0][3], {"allow_remote": "false", "timeout_ms": "20000"}
)
headers = { headers = {
b"Content-Length": [b"%d" % (len(self.test_image.data))], b"Content-Length": [b"%d" % (len(self.test_image.data))],

View File

@ -92,7 +92,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
- the bad power level value for "room", before JSON serisalistion - the bad power level value for "room", before JSON serisalistion
- whether Bob should expect the message to be highlighted - whether Bob should expect the message to be highlighted
Reproduces #14060. Reproduces https://github.com/matrix-org/synapse/issues/14060.
A lack of validation: the gift that keeps on giving. A lack of validation: the gift that keeps on giving.
""" """

View File

@ -62,7 +62,7 @@ class ToDeviceStreamTestCase(BaseStreamTestCase):
) )
# add one more message, for user2 this time # add one more message, for user2 this time
# this message would be dropped before fixing #15335 # this message would be dropped before fixing https://github.com/matrix-org/synapse/issues/15335
msg["content"] = {"device": {}} msg["content"] = {"device": {}}
messages = {user2: {"device": msg}} messages = {user2: {"device": msg}}

View File

@ -35,6 +35,10 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler() typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler) assert isinstance(typing, TypingWriterHandler)
# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
self.reconnect() self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True) typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
@ -91,6 +95,10 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler() typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler) assert isinstance(typing, TypingWriterHandler)
# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
self.reconnect() self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True) typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

View File

@ -1478,7 +1478,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
def test_deactivate_user_erase_true_avatar_nonnull_but_empty(self) -> None: def test_deactivate_user_erase_true_avatar_nonnull_but_empty(self) -> None:
"""Check we can erase a user whose avatar is the empty string. """Check we can erase a user whose avatar is the empty string.
Reproduces #12257. Reproduces https://github.com/matrix-org/synapse/issues/12257.
""" """
# Patch `self.other_user` to have an empty string as their avatar. # Patch `self.other_user` to have an empty string as their avatar.
self.get_success( self.get_success(
@ -4854,3 +4854,59 @@ class UsersByThreePidTestCase(unittest.HomeserverTestCase):
{"user_id": self.other_user}, {"user_id": self.other_user},
channel.json_body, channel.json_body,
) )
class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
]
@staticmethod
def url(user: str) -> str:
template = (
"/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia"
)
return template.format(urllib.parse.quote(user))
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
def test_error_cases(self) -> None:
fake_user = "@bums:other"
channel = self.make_request(
"POST", self.url(fake_user), access_token=self.admin_user_tok
)
# Fail: user doesn't exist
self.assertEqual(404, channel.code, msg=channel.json_body)
channel = self.make_request(
"POST", self.url(self.other_user), access_token=self.admin_user_tok
)
# Fail: user exists, but has no master cross-signing key
self.assertEqual(404, channel.code, msg=channel.json_body)
def test_success(self) -> None:
# Upload a master key.
dummy_key = {"keys": {"a": "b"}}
self.get_success(
self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key)
)
channel = self.make_request(
"POST", self.url(self.other_user), access_token=self.admin_user_tok
)
# Success!
self.assertEqual(200, channel.code, msg=channel.json_body)
# Should now find that the key exists.
_, timestamp = self.get_success(
self.store.get_master_cross_signing_key_updatable_before(self.other_user)
)
assert timestamp is not None
self.assertGreater(timestamp, self.clock.time_msec())

View File

@ -64,7 +64,7 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase):
# 403. However, since the v1 spec no longer exists and the v1 # 403. However, since the v1 spec no longer exists and the v1
# implementation is now part of the r0 implementation, the newer # implementation is now part of the r0 implementation, the newer
# behaviour is used instead to be consistent with the r0 spec. # behaviour is used instead to be consistent with the r0 spec.
# see issue #2602 # see issue https://github.com/matrix-org/synapse/issues/2602
channel = self.make_request( channel = self.make_request(
"GET", "/events?access_token=%s" % ("invalid" + self.token,) "GET", "/events?access_token=%s" % ("invalid" + self.token,)
) )

View File

@ -11,8 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License # limitations under the License
import urllib.parse
from http import HTTPStatus from http import HTTPStatus
from unittest.mock import patch
from signedjson.key import ( from signedjson.key import (
encode_verify_key_base64, encode_verify_key_base64,
@ -24,12 +25,19 @@ from signedjson.sign import sign_json
from synapse.api.errors import Codes from synapse.api.errors import Codes
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import keys, login from synapse.rest.client import keys, login
from synapse.types import JsonDict from synapse.types import JsonDict, Requester, create_requester
from tests import unittest from tests import unittest
from tests.http.server._base import make_request_with_cancellation_test from tests.http.server._base import make_request_with_cancellation_test
from tests.unittest import override_config from tests.unittest import override_config
try:
import authlib # noqa: F401
HAS_AUTHLIB = True
except ImportError:
HAS_AUTHLIB = False
class KeyQueryTestCase(unittest.HomeserverTestCase): class KeyQueryTestCase(unittest.HomeserverTestCase):
servlets = [ servlets = [
@ -259,3 +267,179 @@ class KeyQueryTestCase(unittest.HomeserverTestCase):
alice_token, alice_token,
) )
self.assertEqual(channel.code, HTTPStatus.OK, channel.result) self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
class SigningKeyUploadServletTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
keys.register_servlets,
]
OIDC_ADMIN_TOKEN = "_oidc_admin_token"
@unittest.skip_unless(HAS_AUTHLIB, "requires authlib")
@override_config(
{
"enable_registration": False,
"experimental_features": {
"msc3861": {
"enabled": True,
"issuer": "https://issuer",
"account_management_url": "https://my-account.issuer",
"client_id": "id",
"client_auth_method": "client_secret_post",
"client_secret": "secret",
"admin_token": OIDC_ADMIN_TOKEN,
},
},
}
)
def test_master_cross_signing_key_replacement_msc3861(self) -> None:
# Provision a user like MAS would, cribbing from
# https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L224-L229
alice = "@alice:test"
channel = self.make_request(
"PUT",
f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}",
access_token=self.OIDC_ADMIN_TOKEN,
content={},
)
self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
# Provision a device like MAS would, cribbing from
# https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L260-L262
alice_device = "alice_device"
channel = self.make_request(
"POST",
f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}/devices",
access_token=self.OIDC_ADMIN_TOKEN,
content={"device_id": alice_device},
)
self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
# Prepare a mock MAS access token.
alice_token = "alice_token_1234_oidcwhatyoudidthere"
async def mocked_get_user_by_access_token(
token: str, allow_expired: bool = False
) -> Requester:
self.assertEqual(token, alice_token)
return create_requester(
user_id=alice,
device_id=alice_device,
scope=[],
is_guest=False,
)
patch_get_user_by_access_token = patch.object(
self.hs.get_auth(),
"get_user_by_access_token",
wraps=mocked_get_user_by_access_token,
)
# Copied from E2eKeysHandlerTestCase
master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
master_pubkey2 = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM"
master_pubkey3 = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
master_key: JsonDict = {
"user_id": alice,
"usage": ["master"],
"keys": {"ed25519:" + master_pubkey: master_pubkey},
}
master_key2: JsonDict = {
"user_id": alice,
"usage": ["master"],
"keys": {"ed25519:" + master_pubkey2: master_pubkey2},
}
master_key3: JsonDict = {
"user_id": alice,
"usage": ["master"],
"keys": {"ed25519:" + master_pubkey3: master_pubkey3},
}
with patch_get_user_by_access_token:
# Upload an initial cross-signing key.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key,
},
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
# Should not be able to upload another master key.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key2,
},
)
self.assertEqual(
channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
)
# Pretend that MAS did UIA and allowed us to replace the master key.
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
access_token=self.OIDC_ADMIN_TOKEN,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
with patch_get_user_by_access_token:
# Should now be able to upload master key2.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key2,
},
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
# Even though we're still in the grace period, we shouldn't be able to
# upload master key 3 immediately after uploading key 2.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key3,
},
)
self.assertEqual(
channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
)
# Pretend that MAS did UIA and allowed us to replace the master key.
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
access_token=self.OIDC_ADMIN_TOKEN,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
timestamp_ms = channel.json_body["updatable_without_uia_before_ms"]
# Advance to 1 second after the replacement period ends.
self.reactor.advance(timestamp_ms - self.clock.time_msec() + 1000)
with patch_get_user_by_access_token:
# We should not be able to upload master key3 because the replacement has
# expired.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key3,
},
)
self.assertEqual(
channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
)

View File

@ -170,7 +170,8 @@ class ProfileTestCase(unittest.HomeserverTestCase):
) )
self.assertEqual(channel.code, 200, channel.result) self.assertEqual(channel.code, 200, channel.result)
# FIXME: If a user has no displayname set, Synapse returns 200 and omits a # FIXME: If a user has no displayname set, Synapse returns 200 and omits a
# displayname from the response. This contradicts the spec, see #13137. # displayname from the response. This contradicts the spec, see
# https://github.com/matrix-org/synapse/issues/13137.
return channel.json_body.get("displayname") return channel.json_body.get("displayname")
def _get_avatar_url(self, name: Optional[str] = None) -> Optional[str]: def _get_avatar_url(self, name: Optional[str] = None) -> Optional[str]:
@ -179,7 +180,8 @@ class ProfileTestCase(unittest.HomeserverTestCase):
) )
self.assertEqual(channel.code, 200, channel.result) self.assertEqual(channel.code, 200, channel.result)
# FIXME: If a user has no avatar set, Synapse returns 200 and omits an # FIXME: If a user has no avatar set, Synapse returns 200 and omits an
# avatar_url from the response. This contradicts the spec, see #13137. # avatar_url from the response. This contradicts the spec, see
# https://github.com/matrix-org/synapse/issues/13137.
return channel.json_body.get("avatar_url") return channel.json_body.get("avatar_url")
@unittest.override_config({"max_avatar_size": 50}) @unittest.override_config({"max_avatar_size": 50})

View File

@ -888,7 +888,8 @@ class RoomsCreateTestCase(RoomBase):
) )
def test_room_creation_ratelimiting(self) -> None: def test_room_creation_ratelimiting(self) -> None:
""" """
Regression test for #14312, where ratelimiting was made too strict. Regression test for https://github.com/matrix-org/synapse/issues/14312,
where ratelimiting was made too strict.
Clients should be able to create 10 rooms in a row Clients should be able to create 10 rooms in a row
without hitting rate limits, using default rate limit config. without hitting rate limits, using default rate limit config.
(We override rate limiting config back to its default value.) (We override rate limiting config back to its default value.)

View File

@ -642,7 +642,7 @@ class SyncCacheTestCase(unittest.HomeserverTestCase):
def test_noop_sync_does_not_tightloop(self) -> None: def test_noop_sync_does_not_tightloop(self) -> None:
"""If the sync times out, we shouldn't cache the result """If the sync times out, we shouldn't cache the result
Essentially a regression test for #8518. Essentially a regression test for https://github.com/matrix-org/synapse/issues/8518.
""" """
self.user_id = self.register_user("kermit", "monkey") self.user_id = self.register_user("kermit", "monkey")
self.tok = self.login("kermit", "monkey") self.tok = self.login("kermit", "monkey")

Some files were not shown because too many files have changed in this diff Show More