Merge branch 'develop' into fix/prajjawal-9443

This commit is contained in:
Prajjawal Agarwal 2023-08-23 13:35:24 -04:00 committed by GitHub
commit 41ac3b906e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 369 additions and 28 deletions

1
changelog.d/16063.misc Normal file
View File

@ -0,0 +1 @@
Fix building the nix development environment on MacOS systems.

View File

@ -0,0 +1 @@
Suppress notifications from message edits per [MSC3958](https://github.com/matrix-org/matrix-spec-proposals/pull/3958).

1
changelog.d/16124.bugfix Normal file
View File

@ -0,0 +1 @@
Filter out user agent references to the sliding sync proxy and rust-sdk from the user_daily_visits table to ensure that Element X can be represented fully.

1
changelog.d/16133.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a rare race that could block new events from being sent for up to two minutes. Introduced in v1.90.0.

View File

@ -1 +1 @@
Clean-up calling `setup_background_tasks` in unit tests. Improve presence tests.

1
changelog.d/16151.misc Normal file
View File

@ -0,0 +1 @@
Improve presence tests.

1
changelog.d/16156.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico.

1
changelog.d/16160.misc Normal file
View File

@ -0,0 +1 @@
Reduce DB contention on worker locks.

1
changelog.d/16164.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico.

1
changelog.d/16165.misc Normal file
View File

@ -0,0 +1 @@
Task scheduler: mark task as active if we are scheduling as soon as possible.

1
changelog.d/16169.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a rare race that could block new events from being sent for up to two minutes. Introduced in v1.90.0.

8
flake.lock generated
View File

@ -8,16 +8,16 @@
"pre-commit-hooks": "pre-commit-hooks" "pre-commit-hooks": "pre-commit-hooks"
}, },
"locked": { "locked": {
"lastModified": 1690534632, "lastModified": 1688058187,
"narHash": "sha256-kOXS9x5y17VKliC7wZxyszAYrWdRl1JzggbQl0gyo94=", "narHash": "sha256-ipDcc7qrucpJ0+0eYNlwnE+ISTcq4m03qW+CWUshRXI=",
"owner": "cachix", "owner": "cachix",
"repo": "devenv", "repo": "devenv",
"rev": "6568e7e485a46bbf32051e4d6347fa1fed8b2f25", "rev": "c8778e3dc30eb9043e218aaa3861d42d4992de77",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "cachix", "owner": "cachix",
"ref": "main", "ref": "v0.6.3",
"repo": "devenv", "repo": "devenv",
"type": "github" "type": "github"
} }

View File

@ -45,7 +45,7 @@
# Output a development shell for x86_64/aarch64 Linux/Darwin (MacOS). # Output a development shell for x86_64/aarch64 Linux/Darwin (MacOS).
systems.url = "github:nix-systems/default"; systems.url = "github:nix-systems/default";
# A development environment manager built on Nix. See https://devenv.sh. # A development environment manager built on Nix. See https://devenv.sh.
devenv.url = "github:cachix/devenv/main"; devenv.url = "github:cachix/devenv/v0.6.3";
# Rust toolchain. # Rust toolchain.
rust-overlay.url = "github:oxalica/rust-overlay"; rust-overlay.url = "github:oxalica/rust-overlay";
}; };

View File

@ -197,7 +197,6 @@ fn bench_eval_message(b: &mut Bencher) {
false, false,
false, false,
false, false,
false,
); );
b.iter(|| eval.run(&rules, Some("bob"), Some("person"))); b.iter(|| eval.run(&rules, Some("bob"), Some("person")));

View File

@ -228,7 +228,7 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
// We don't want to notify on edits *unless* the edit directly mentions a // We don't want to notify on edits *unless* the edit directly mentions a
// user, which is handled above. // user, which is handled above.
PushRule { PushRule {
rule_id: Cow::Borrowed("global/override/.org.matrix.msc3958.suppress_edits"), rule_id: Cow::Borrowed("global/override/.m.rule.suppress_edits"),
priority_class: 5, priority_class: 5,
conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventPropertyIs( conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventPropertyIs(
EventPropertyIsCondition { EventPropertyIsCondition {

View File

@ -564,7 +564,7 @@ fn test_requires_room_version_supports_condition() {
}; };
let rules = PushRules::new(vec![custom_rule]); let rules = PushRules::new(vec![custom_rule]);
result = evaluator.run( result = evaluator.run(
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false), &FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true),
None, None,
None, None,
); );

View File

@ -527,7 +527,6 @@ pub struct FilteredPushRules {
msc1767_enabled: bool, msc1767_enabled: bool,
msc3381_polls_enabled: bool, msc3381_polls_enabled: bool,
msc3664_enabled: bool, msc3664_enabled: bool,
msc3958_suppress_edits_enabled: bool,
} }
#[pymethods] #[pymethods]
@ -539,7 +538,6 @@ impl FilteredPushRules {
msc1767_enabled: bool, msc1767_enabled: bool,
msc3381_polls_enabled: bool, msc3381_polls_enabled: bool,
msc3664_enabled: bool, msc3664_enabled: bool,
msc3958_suppress_edits_enabled: bool,
) -> Self { ) -> Self {
Self { Self {
push_rules, push_rules,
@ -547,7 +545,6 @@ impl FilteredPushRules {
msc1767_enabled, msc1767_enabled,
msc3381_polls_enabled, msc3381_polls_enabled,
msc3664_enabled, msc3664_enabled,
msc3958_suppress_edits_enabled,
} }
} }
@ -584,12 +581,6 @@ impl FilteredPushRules {
return false; return false;
} }
if !self.msc3958_suppress_edits_enabled
&& rule.rule_id == "global/override/.org.matrix.msc3958.suppress_edits"
{
return false;
}
true true
}) })
.map(|r| { .map(|r| {

View File

@ -46,7 +46,6 @@ class FilteredPushRules:
msc1767_enabled: bool, msc1767_enabled: bool,
msc3381_polls_enabled: bool, msc3381_polls_enabled: bool,
msc3664_enabled: bool, msc3664_enabled: bool,
msc3958_suppress_edits_enabled: bool,
): ... ): ...
def rules(self) -> Collection[Tuple[PushRule, bool]]: ... def rules(self) -> Collection[Tuple[PushRule, bool]]: ...

View File

@ -383,11 +383,6 @@ class ExperimentalConfig(Config):
# MSC3391: Removing account data. # MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False) self.msc3391_enabled = experimental.get("msc3391_enabled", False)
# MSC3959: Do not generate notifications for edits.
self.msc3958_supress_edit_notifs = experimental.get(
"msc3958_supress_edit_notifs", False
)
# MSC3967: Do not require UIA when first uploading cross signing keys # MSC3967: Do not require UIA when first uploading cross signing keys
self.msc3967_enabled = experimental.get("msc3967_enabled", False) self.msc3967_enabled = experimental.get("msc3967_enabled", False)

View File

@ -579,6 +579,11 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
device_id: Optional[str], device_id: Optional[str],
now: Optional[int] = None, now: Optional[int] = None,
) -> None: ) -> None:
# The sync proxy continuously triggers /sync even if the user is not
# present so should be excluded from user_ips entries.
if user_agent == "sync-v3-proxy-":
return
if not now: if not now:
now = int(self._clock.time_msec()) now = int(self._clock.time_msec())
key = (user_id, access_token, ip) key = (user_id, access_token, ip)

View File

@ -88,7 +88,6 @@ def _load_rules(
msc1767_enabled=experimental_config.msc1767_enabled, msc1767_enabled=experimental_config.msc1767_enabled,
msc3664_enabled=experimental_config.msc3664_enabled, msc3664_enabled=experimental_config.msc3664_enabled,
msc3381_polls_enabled=experimental_config.msc3381_polls_enabled, msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
msc3958_suppress_edits_enabled=experimental_config.msc3958_supress_edit_notifs,
) )
return filtered_rules return filtered_rules

View File

@ -256,8 +256,10 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval = EXCLUDED.retry_interval retry_interval = EXCLUDED.retry_interval
WHERE WHERE
EXCLUDED.retry_interval = 0 EXCLUDED.retry_interval = 0
OR EXCLUDED.retry_last_ts = 0
OR destinations.retry_interval IS NULL OR destinations.retry_interval IS NULL
OR destinations.retry_interval < EXCLUDED.retry_interval OR destinations.retry_interval < EXCLUDED.retry_interval
OR destinations.retry_last_ts < EXCLUDED.retry_last_ts
""" """
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

View File

@ -0,0 +1,37 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
-- Reduce the number of writes we do on this table.
--
-- Note: that we still want to lock the row here (i.e. still do a `DO UPDATE
-- SET`) so that we serialize updates.
CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
BEGIN
INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
ON CONFLICT (lock_name, lock_key)
DO UPDATE SET write_lock = NEW.write_lock
WHERE OLD.write_lock != NEW.write_lock;
RETURN NEW;
END
$$
LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks;
CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
FOR EACH ROW
EXECUTE PROCEDURE upsert_read_write_lock_parent();

View File

@ -0,0 +1,71 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Remove a previous attempt to avoid deadlocks
DROP TRIGGER IF EXISTS delete_read_write_lock_parent_before_trigger ON worker_read_write_locks;
DROP FUNCTION IF EXISTS delete_read_write_lock_parent_before;
-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
-- update the `worker_read_write_locks_mode.token` to match another instance
-- that has currently acquired the lock, or we delete the row if nobody has
-- currently acquired a lock.
CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$
DECLARE
new_token TEXT;
mode_row_token TEXT;
BEGIN
-- Only update the token in `_mode` if its our token. This prevents
-- deadlocks.
--
-- We shove the token into `mode_row_token`, as otherwise postgres complains
-- we're not using the returned data.
SELECT token INTO mode_row_token FROM worker_read_write_locks_mode
WHERE
lock_name = OLD.lock_name
AND lock_key = OLD.lock_key
AND token = OLD.token
FOR UPDATE;
IF NOT FOUND THEN
RETURN NEW;
END IF;
SELECT token INTO new_token FROM worker_read_write_locks
WHERE
lock_name = OLD.lock_name
AND lock_key = OLD.lock_key
LIMIT 1 FOR UPDATE SKIP LOCKED;
IF NOT FOUND THEN
DELETE FROM worker_read_write_locks_mode
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key AND token = OLD.token;
ELSE
UPDATE worker_read_write_locks_mode
SET token = new_token
WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
END IF;
RETURN NEW;
END
$$
LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger ON worker_read_write_locks;
CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks
FOR EACH ROW
EXECUTE PROCEDURE delete_read_write_lock_parent();

View File

@ -154,13 +154,15 @@ class TaskScheduler:
f"No function associated with action {action} of the scheduled task" f"No function associated with action {action} of the scheduled task"
) )
status = TaskStatus.SCHEDULED
if timestamp is None or timestamp < self._clock.time_msec(): if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec() timestamp = self._clock.time_msec()
status = TaskStatus.ACTIVE
task = ScheduledTask( task = ScheduledTask(
random_string(16), random_string(16),
action, action,
TaskStatus.SCHEDULED, status,
timestamp, timestamp,
resource_id, resource_id,
params, params,

View File

@ -38,6 +38,7 @@ from synapse.handlers.presence import (
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import room from synapse.rest.client import room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.database import LoggingDatabaseConnection
from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util import Clock from synapse.util import Clock
@ -513,6 +514,121 @@ class PresenceTimeoutTestCase(unittest.TestCase):
self.assertEqual(state, new_state) self.assertEqual(state, new_state)
class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
def default_config(self) -> JsonDict:
config = super().default_config()
# Disable background tasks on this worker so that the PresenceHandler isn't
# loaded until we request it.
config["run_background_tasks_on"] = "other"
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user_id = f"@test:{self.hs.config.server.server_name}"
# Move the reactor to the initial time.
self.reactor.advance(1000)
now = self.clock.time_msec()
main_store = hs.get_datastores().main
self.get_success(
main_store.update_presence(
[
UserPresenceState(
user_id=self.user_id,
state=PresenceState.ONLINE,
last_active_ts=now,
last_federation_update_ts=now,
last_user_sync_ts=now,
status_msg=None,
currently_active=True,
)
]
)
)
# Regenerate the preloaded presence information on PresenceStore.
def refill_presence(db_conn: LoggingDatabaseConnection) -> None:
main_store._presence_on_startup = main_store._get_active_presence(db_conn)
self.get_success(main_store.db_pool.runWithConnection(refill_presence))
def test_restored_presence_idles(self) -> None:
"""The presence state restored from the database should not persist forever."""
# Get the handler (which kicks off a bunch of timers).
presence_handler = self.hs.get_presence_handler()
# Assert the user is online.
state = self.get_success(
presence_handler.get_state(UserID.from_string(self.user_id))
)
self.assertEqual(state.state, PresenceState.ONLINE)
# Advance such that the user should timeout.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
self.reactor.pump([5])
# Check that the user is now offline.
state = self.get_success(
presence_handler.get_state(UserID.from_string(self.user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)
@parameterized.expand(
[
(PresenceState.BUSY, PresenceState.BUSY),
(PresenceState.ONLINE, PresenceState.ONLINE),
(PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
# Offline syncs don't update the state.
(PresenceState.OFFLINE, PresenceState.ONLINE),
]
)
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_restored_presence_online_after_sync(
self, sync_state: str, expected_state: str
) -> None:
"""
The presence state restored from the database should be overridden with sync after a timeout.
Args:
sync_state: The presence state of the new sync.
expected_state: The expected presence right after the sync.
"""
# Get the handler (which kicks off a bunch of timers).
presence_handler = self.hs.get_presence_handler()
# Assert the user is online, as restored.
state = self.get_success(
presence_handler.get_state(UserID.from_string(self.user_id))
)
self.assertEqual(state.state, PresenceState.ONLINE)
# Advance slightly and sync.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
self.get_success(
presence_handler.user_syncing(
self.user_id, sync_state != PresenceState.OFFLINE, sync_state
)
)
# Assert the user is in the expected state.
state = self.get_success(
presence_handler.get_state(UserID.from_string(self.user_id))
)
self.assertEqual(state.state, expected_state)
# Advance such that the user's preloaded data times out, but not the new sync.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
self.reactor.pump([5])
# Check that the user is in the sync state (as the client is currently syncing still).
state = self.get_success(
presence_handler.get_state(UserID.from_string(self.user_id))
)
self.assertEqual(state.state, sync_state)
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
user_id = "@test:server" user_id = "@test:server"
user_id_obj = UserID.from_string(user_id) user_id_obj = UserID.from_string(user_id)

View File

@ -382,7 +382,6 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
) )
) )
@override_config({"experimental_features": {"msc3958_supress_edit_notifs": True}})
def test_suppress_edits(self) -> None: def test_suppress_edits(self) -> None:
"""Under the default push rules, event edits should not generate notifications.""" """Under the default push rules, event edits should not generate notifications."""
bulk_evaluator = BulkPushRuleEvaluator(self.hs) bulk_evaluator = BulkPushRuleEvaluator(self.hs)

View File

@ -654,6 +654,71 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
r, r,
) )
def test_invalid_user_agents_are_ignored(self) -> None:
# First make sure we have completed all updates.
self.wait_for_background_updates()
user_id1 = "@user1:id"
user_id2 = "@user2:id"
device_id1 = "MY_DEVICE1"
device_id2 = "MY_DEVICE2"
access_token1 = "access_token1"
access_token2 = "access_token2"
# Insert a user IP 1
self.get_success(
self.store.store_device(
user_id1,
device_id1,
"display name1",
)
)
# Insert a user IP 2
self.get_success(
self.store.store_device(
user_id2,
device_id2,
"display name2",
)
)
self.get_success(
self.store.insert_client_ip(
user_id1, access_token1, "ip", "sync-v3-proxy-", device_id1
)
)
self.get_success(
self.store.insert_client_ip(
user_id2, access_token2, "ip", "user_agent", device_id2
)
)
# Force persisting to disk
self.reactor.advance(200)
# We should see that in the DB
result = self.get_success(
self.store.db_pool.simple_select_list(
table="user_ips",
keyvalues={},
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
desc="get_user_ip_and_agents",
)
)
# ensure user1 is filtered out
self.assertEqual(
result,
[
{
"access_token": access_token2,
"ip": "ip",
"user_agent": "user_agent",
"device_id": device_id2,
"last_seen": 0,
}
],
)
class ClientIpAuthTestCase(unittest.HomeserverTestCase): class ClientIpAuthTestCase(unittest.HomeserverTestCase):
servlets = [ servlets = [

View File

@ -108,3 +108,54 @@ class RetryLimiterTestCase(HomeserverTestCase):
new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings) self.assertIsNone(new_timings)
def test_max_retry_interval(self) -> None:
"""Test that `destination_max_retry_interval` setting works as expected"""
store = self.hs.get_datastores().main
destination_max_retry_interval_ms = (
self.hs.config.federation.destination_max_retry_interval_ms
)
self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
failure_ts = self.clock.time_msec()
# Simulate reaching destination_max_retry_interval
self.get_success(
store.set_destination_retry_timings(
"test_dest",
failure_ts=failure_ts,
retry_last_ts=failure_ts,
retry_interval=destination_max_retry_interval_ms,
)
)
# Check it fails
self.get_failure(
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
)
# Get past retry_interval and we can try again, and still throw an error to continue the backoff
self.reactor.advance(destination_max_retry_interval_ms / 1000 + 1)
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
try:
with limiter:
self.pump(1)
raise AssertionError("argh")
except AssertionError:
pass
self.pump()
# retry_interval does not increase and stays at destination_max_retry_interval_ms
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
assert new_timings is not None
self.assertEqual(new_timings.retry_interval, destination_max_retry_interval_ms)
# Check it fails
self.get_failure(
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
)