From f10ce8944b8ef4c33694654c397bfcda44c6124a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:10:28 +0100 Subject: [PATCH 01/20] Don't double json encode federation replication data --- synapse/app/federation_sender.py | 4 +--- synapse/federation/send_queue.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index cbddc80ca9..145c01f3a3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,6 @@ from daemonize import Daemonize import sys import logging import gc -import ujson as json logger = logging.getLogger("synapse.app.appservice") @@ -290,8 +289,7 @@ class FederationSenderHandler(object): # Parse the rows in the stream for row in rows: typ = row.type - content_js = row.data - content = json.loads(content_js) + content = row.data if typ == send_queue.PRESENCE_TYPE: destination = content["destination"] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4bde66fbf8..78c852ed69 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,6 @@ from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict -import ujson metrics = synapse.metrics.get_metrics_for(__name__) @@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, ujson.dumps({ + rows.append((key, PRESENCE_TYPE, { "destination": dest, "state": self.presence_map[user_id].as_dict(), - }))) + })) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object): for (pos, (destination, edu_key)) in keyed_edus: rows.append( - (pos, KEYED_EDU_TYPE, ujson.dumps({ + (pos, KEYED_EDU_TYPE, { "key": edu_key, "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - })) + }) ) # Fetch changed edus @@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) + rows.append((pos, EDU_TYPE, edu.get_internal_dict())) # Fetch changed failures keys = self.failures.keys() @@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, { "destination": destination, "failure": failure, - }))) + })) # Fetch changed device messages keys = self.device_messages.keys() @@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + rows.append((pos, DEVICE_MESSAGE_TYPE, { "destination": destination, - }))) + })) # Sort rows based on pos rows.sort() From 96b9b6c1275087acf57faf1306bcc392dbd9f842 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:34:20 +0100 Subject: [PATCH 02/20] Don't double json encode typing replication data --- synapse/app/synchrotron.py | 4 +--- synapse/handlers/typing.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1ef5dfa77..1fac021ea9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -62,7 +62,6 @@ import sys import logging import contextlib import gc -import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -215,9 +214,8 @@ class SynchrotronTyping(object): self._latest_room_serial = token for row in rows: - typing = json.loads(row.user_ids) self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = typing + self._room_typing[row.room_id] = row.user_ids class SynchrotronApplicationService(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d6809862e0..3b7818af5c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id import logging from collections import namedtuple -import ujson as json logger = logging.getLogger(__name__) @@ -288,8 +287,7 @@ class TypingHandler(object): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps(list(typing), ensure_ascii=False) - rows.append((serial, room_id, typing_bytes)) + rows.append((serial, room_id, list(typing))) rows.sort() return rows From b5cb6347a4d9b9fb5c21a2a6e3a2852c14b837fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 13:25:40 +0100 Subject: [PATCH 03/20] Don't immediately notify the master about users whose syncs have gone away --- synapse/app/synchrotron.py | 40 ++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1ef5dfa77..a5dfbef8da 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -120,12 +120,46 @@ class SynchrotronPresence(object): for state in active_presence } + # user_id -> last_sync_ms. Lists the users that have stopped syncing + # but we haven't notified the master of that yet + self.users_going_offline = {} + + self._send_stop_syncing_loop = self.clock.looping_call( + self.send_stop_syncing, 10 * 1000 + ) + self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + def mark_as_coming_online(self, user_id): + """A user has started syncing. Send a UserSync to the master, unless they + had recently stopped syncing. + """ + going_offline = self.users_going_offline.pop(user_id, None) + if not going_offline: + self.send_user_sync(user_id, True, self.clock.time_msec()) + + def mark_as_going_offline(self, user_id): + """A user has stopped syncing. We wait before notifying the master as + its likely they'll come back soon. This allows us to avoid sending + a stopped syncing immediately followed by a started syncing notification + to the master + """ + self.users_going_offline[user_id] = self.clock.time_msec() + + def send_stop_syncing(self): + """Check if there are any users who have stopped syncing a while ago + and haven't come back yet. If there are poke the master about them. + """ + now = self.clock.time_msec() + for user_id, last_sync_ms in self.users_going_offline.items(): + if now - last_sync_ms > 10 * 1000: + self.users_going_offline.pop(user_id, None) + self.send_user_sync(user_id, False, last_sync_ms) + def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? pass @@ -142,8 +176,7 @@ class SynchrotronPresence(object): # If we went from no in flight sync to some, notify replication if self.user_to_num_current_syncs[user_id] == 1: - now = self.clock.time_msec() - self.send_user_sync(user_id, True, now) + self.mark_as_coming_online(user_id) def _end(): # We check that the user_id is in user_to_num_current_syncs because @@ -154,8 +187,7 @@ class SynchrotronPresence(object): # If we went from one in flight sync to non, notify replication if self.user_to_num_current_syncs[user_id] == 0: - now = self.clock.time_msec() - self.send_user_sync(user_id, False, now) + self.mark_as_going_offline(user_id) @contextlib.contextmanager def _user_syncing(): From fcc803b2bf9bd389a7f7d227a7c0386d22d37645 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 17:13:44 +0100 Subject: [PATCH 04/20] Add log lines --- synapse/replication/tcp/protocol.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d4d672aafe..19b1ce504f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -220,6 +220,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): logger.exception("[%s] Failed to handle line: %r", self.id(), line) def close(self): + logger.warn("[%s] Closing connection", self.id()) self.time_we_closed = self.clock.time_msec() self.transport.loseConnection() self.on_connection_closed() @@ -505,7 +506,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) - self.transport.abortConnection() + self.send_error("Wrong remote") def on_RDATA(self, cmd): try: From 69b3fd485db2da43c15860d4ed41a0be105aab32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 09:36:38 +0100 Subject: [PATCH 05/20] Fix incorrect type when using InvalidateCacheCommand --- synapse/replication/tcp/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 251d3afcf4..90fb6c1336 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -175,7 +175,7 @@ class ReplicationClientHandler(object): def send_invalidate_cache(self, cache_func, keys): """Poke the master to invalidate a cache. """ - cmd = InvalidateCacheCommand(cache_func, keys) + cmd = InvalidateCacheCommand(cache_func.__name__, keys) self.send_command(cmd) def await_sync(self, data): From dbf87282d39b9ee503f07d68c99ccc1624fc6e17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 13:11:21 +0100 Subject: [PATCH 06/20] Docs --- synapse/app/synchrotron.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a5dfbef8da..e0716a6226 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -137,9 +137,13 @@ class SynchrotronPresence(object): def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they had recently stopped syncing. + + Args: + user_id (str) """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: + # Safe to skip if we haven't yet told the master they were offline self.send_user_sync(user_id, True, self.clock.time_msec()) def mark_as_going_offline(self, user_id): @@ -147,6 +151,9 @@ class SynchrotronPresence(object): its likely they'll come back soon. This allows us to avoid sending a stopped syncing immediately followed by a started syncing notification to the master + + Args: + user_id (str) """ self.users_going_offline[user_id] = self.clock.time_msec() From ad544c803a0e7ba99f2c7b6ce61f7c2faa962f3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 13:28:52 +0100 Subject: [PATCH 07/20] Document types of the replication streams --- synapse/replication/tcp/streams.py | 104 +++++++++++++++++++++-------- 1 file changed, 76 insertions(+), 28 deletions(-) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 4de4ebe84d..967b459e0e 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -36,34 +36,82 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 10000 -EventStreamRow = namedtuple("EventStreamRow", - ("event_id", "room_id", "type", "state_key", "redacts")) -BackfillStreamRow = namedtuple("BackfillStreamRow", - ("event_id", "room_id", "type", "state_key", "redacts")) -PresenceStreamRow = namedtuple("PresenceStreamRow", - ("user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active")) -TypingStreamRow = namedtuple("TypingStreamRow", - ("room_id", "user_ids")) -ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", - ("room_id", "receipt_type", "user_id", "event_id", - "data")) -PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) -PushersStreamRow = namedtuple("PushersStreamRow", - ("user_id", "app_id", "pushkey", "deleted",)) -CachesStreamRow = namedtuple("CachesStreamRow", - ("cache_func", "keys", "invalidation_ts",)) -PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", - ("room_id", "visibility", "appservice_id", - "network_id",)) -DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",)) -ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) -FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",)) -TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", - ("user_id", "room_id", "data")) -AccountDataStreamRow = namedtuple("AccountDataStream", - ("user_id", "room_id", "data_type", "data")) +EventStreamRow = namedtuple("EventStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +BackfillStreamRow = namedtuple("BackfillStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +PresenceStreamRow = namedtuple("PresenceStreamRow", ( + "user_id", # str + "state", # str + "last_active_ts", # int + "last_federation_update_ts", # int + "last_user_sync_ts", # int + "status_msg", # str + "currently_active", # bool +)) +TypingStreamRow = namedtuple("TypingStreamRow", ( + "room_id", # str + "user_ids", # list(str) +)) +ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", ( + "room_id", # str + "receipt_type", # str + "user_id", # str + "event_id", # str + "data", # dict +)) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ( + "user_id", # str +)) +PushersStreamRow = namedtuple("PushersStreamRow", ( + "user_id", # str + "app_id", # str + "pushkey", # str + "deleted", # bool +)) +CachesStreamRow = namedtuple("CachesStreamRow", ( + "cache_func", # str + "keys", # list(str) + "invalidation_ts", # int +)) +PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", ( + "room_id", # str + "visibility", # str + "appservice_id", # str, optional + "network_id", # str, optional +)) +DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( + "user_id", # str + "destination", # str +)) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( + "entity", # str +)) +FederationStreamRow = namedtuple("FederationStreamRow", ( + "type", # str + "data", # dict +)) +TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( + "user_id", # str + "room_id", # str + "data", # dict +)) +AccountDataStreamRow = namedtuple("AccountDataStream", ( + "user_id", # str + "room_id", # str + "data_type", # str + "data", # dict +)) class Stream(object): From 391712a4f9c43359c9f6c36a6ed51800b370ffc0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 13:35:00 +0100 Subject: [PATCH 08/20] Comment --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e0716a6226..67d9210f2a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -143,7 +143,7 @@ class SynchrotronPresence(object): """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: - # Safe to skip if we haven't yet told the master they were offline + # Safe to skip because we haven't yet told the master they were offline self.send_user_sync(user_id, True, self.clock.time_msec()) def mark_as_going_offline(self, user_id): From 877c029c16c41b29efde120af7894f5ace43c8fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 15:51:12 +0100 Subject: [PATCH 09/20] Use iteritems --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 53baf3e79a..9ed5af3cb4 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -596,14 +596,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.items() if not state] + missing = [user_id for user_id, state in states.iteritems() if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.items() if not state] + missing = [user_id for user_id, state in states.iteritems() if not state] if missing: new = { user_id: UserPresenceState.default(user_id) From 8a1137ceab54b5b2643f1be3995b2977f5cd2c63 Mon Sep 17 00:00:00 2001 From: Kim Brose Date: Thu, 6 Apr 2017 17:10:20 +0200 Subject: [PATCH 10/20] fix typo in synctl help --- synapse/app/synctl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index c045588866..e84045c7d1 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -98,7 +98,7 @@ def main(): "configfile", nargs="?", default="homeserver.yaml", - help="the homeserver config file, defaults to homserver.yaml", + help="the homeserver config file, defaults to homeserver.yaml", ) parser.add_argument( "-w", "--worker", From d72667fcce6f751c55ea510c964d68499cb67305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 10:10:49 +0100 Subject: [PATCH 11/20] Speed up get_current_state_ids Using _simple_select_list is fairly expensive for functions that return a lot of rows and/or get called a lot. (This is because it carefully constructs a list of dicts). get_current_state_ids gets called a lot on startup and e.g. when the IRC bridge decided to send tonnes of joins/leaves (as it invalidates the cache). We therefore replace it with a custon txn function that builds up the final result dict without building up and intermediate representation. --- synapse/storage/state.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fb23f6f462..86e5fdb76b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string from synapse.storage.engines import PostgresEngine @@ -69,17 +69,24 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) - @cachedInlineCallbacks(max_entries=100000, iterable=True) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): - rows = yield self._simple_select_list( - table="current_state_events", - keyvalues={"room_id": room_id}, - retcols=["event_id", "type", "state_key"], - desc="_calculate_state_delta", + def _get_current_state_ids_txn(txn): + txn.execute( + """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? + """, + (room_id,) + ) + + return { + (r[0], r[1]): r[2] for r in txn + } + + return self.runInteraction( + "get_current_state_ids", + _get_current_state_ids_txn, ) - defer.returnValue({ - (r["type"], r["state_key"]): r["event_id"] for r in rows - }) @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): From 449d1297cae8bb05140b257f6e08e4cc036bf481 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:48:27 +0100 Subject: [PATCH 12/20] Fix up federation SendQueue and document types --- synapse/app/federation_sender.py | 66 +-------- synapse/federation/send_queue.py | 246 +++++++++++++++++++++++++++---- 2 files changed, 221 insertions(+), 91 deletions(-) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 145c01f3a3..477e16e0fa 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.federation.units import Edu from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -277,69 +275,7 @@ class FederationSenderHandler(object): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - # The federation stream containis a bunch of different types of - # rows that need to be handled differently. We parse the rows, put - # them into the appropriate collection and then send them off. - presence_to_send = {} - keyed_edus = {} - edus = {} - failures = {} - device_destinations = set() - - # Parse the rows in the stream - for row in rows: - typ = row.type - content = row.data - - if typ == send_queue.PRESENCE_TYPE: - destination = content["destination"] - state = UserPresenceState.from_dict(content["state"]) - - presence_to_send.setdefault(destination, []).append(state) - elif typ == send_queue.KEYED_EDU_TYPE: - key = content["key"] - edu = Edu(**content["edu"]) - - keyed_edus.setdefault( - edu.destination, {} - )[(edu.destination, tuple(key))] = edu - elif typ == send_queue.EDU_TYPE: - edu = Edu(**content) - - edus.setdefault(edu.destination, []).append(edu) - elif typ == send_queue.FAILURE_TYPE: - destination = content["destination"] - failure = content["failure"] - - failures.setdefault(destination, []).append(failure) - elif typ == send_queue.DEVICE_MESSAGE_TYPE: - device_destinations.add(content["destination"]) - else: - raise Exception("Unrecognised federation type: %r", typ) - - # We've finished collecting, send everything off - for destination, states in presence_to_send.items(): - self.federation_sender.send_presence(destination, states) - - for destination, edu_map in keyed_edus.items(): - for key, edu in edu_map.items(): - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) - - for destination, edu_list in edus.items(): - for edu in edu_list: - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) - - for destination, failure_list in failures.items(): - for failure in failure_list: - self.federation_sender.send_failure(destination, failure) - - for destination in device_destinations: - self.federation_sender.send_device_messages(destination) - + send_queue.process_rows_for_federation(self.federation_sender, rows) preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 78c852ed69..8a6392c697 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,22 +31,17 @@ Events are replicated via a separate events stream. from .units import Edu +from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict +from collections import namedtuple metrics = synapse.metrics.get_metrics_for(__name__) -PRESENCE_TYPE = "p" -KEYED_EDU_TYPE = "k" -EDU_TYPE = "e" -FAILURE_TYPE = "f" -DEVICE_MESSAGE_TYPE = "d" - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -257,10 +252,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, { - "destination": dest, - "state": self.presence_map[user_id].as_dict(), - })) + rows.append((key, PresenceRow( + destination=dest, + state=self.presence_map[user_id], + ))) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -269,12 +264,10 @@ class FederationRemoteSendQueue(object): keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: - rows.append( - (pos, KEYED_EDU_TYPE, { - "key": edu_key, - "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - }) - ) + rows.append((pos, KeyedEduRow( + key=edu_key, + edu=self.keyed_edu[(destination, edu_key)], + ))) # Fetch changed edus keys = self.edus.keys() @@ -283,7 +276,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, edu.get_internal_dict())) + rows.append((pos, EduRow(edu))) # Fetch changed failures keys = self.failures.keys() @@ -292,10 +285,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, { - "destination": destination, - "failure": failure, - })) + rows.append((pos, FailureRow( + destination=destination, + failure=failure, + ))) # Fetch changed device messages keys = self.device_messages.keys() @@ -304,11 +297,212 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, { - "destination": destination, - })) + rows.append((pos, DeviceRow( + destination=destination, + ))) # Sort rows based on pos rows.sort() - return rows + return [(pos, row.TypeId, row.to_data()) for pos, row in rows] + + +class BaseFederationRow(object): + TypeId = None + + @staticmethod + def from_data(data): + """Parse the data from the federation stream into a row. + """ + raise NotImplementedError() + + def to_data(self): + """Serialize this row to be sent over the federation stream + """ + raise NotImplementedError() + + def add_to_buffer(self, buff): + """Add this row to the appropriate field in the buffer ready for this + to be sent over federation. + + We use a buffer so that we can batch up events that have come in at + the same time and send them all at once. + + Args: + buff (BufferedToSend) + """ + raise NotImplementedError() + + +class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( + "destination", # str + "state", # UserPresenceState +))): + TypeId = "p" + + @staticmethod + def from_data(data): + return PresenceRow( + destination=data["destination"], + state=UserPresenceState.from_dict(data["state"]) + ) + + def to_data(self): + return { + "destination": self.destination, + "state": self.state.as_dict() + } + + def add_to_buffer(self, buff): + buff.presence.setdefault(self.destination, []).append(self.state) + + +class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( + "key", # tuple(str) - the edu key passed to send_edu + "edu", # Edu +))): + TypeId = "k" + + @staticmethod + def from_data(data): + return KeyedEduRow( + key=tuple(data["key"]), + edu=Edu(**data["edu"]), + ) + + def to_data(self): + return { + "key": self.key, + "edu": self.edu.get_internal_dict(), + } + + def add_to_buffer(self, buff): + buff.keyed_edus.setdefault( + self.edu.destination, {} + )[self.key] = self.edu + + +class EduRow(BaseFederationRow, namedtuple("EduRow", ( + "edu", # Edu +))): + TypeId = "e" + + @staticmethod + def from_data(data): + return EduRow(Edu(**data)) + + def to_data(self): + return self.edu.get_internal_dict() + + def add_to_buffer(self, buff): + buff.edus.setdefault(self.edu.destination, []).append(self.edu) + + +class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( + "destination", # str + "failure", +))): + TypeId = "f" + + @staticmethod + def from_data(data): + return FailureRow( + destination=data["destination"], + failure=data["failure"], + ) + + def to_data(self): + return { + "destination": self.destination, + "failure": self.failure, + } + + def add_to_buffer(self, buff): + buff.failures.setdefault(self.destination, []).append(self.failure) + + +class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( + "destination", # str +))): + TypeId = "d" + + @staticmethod + def from_data(data): + return DeviceRow(destination=data) + + def to_data(self): + return self.destination + + def add_to_buffer(self, buff): + buff.device_destinations.add(self.destination) + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + PresenceRow, + KeyedEduRow, + EduRow, + FailureRow, + DeviceRow, + ) +} + + +BufferedToSend = namedtuple("BufferedToSend", ( + "presence", # dict of destination -> [UserPresenceState] + "keyed_edus", # dict of destination -> { key -> Edu } + "edus", # dict of destination -> [Edu] + "failures", # dict of destination -> [failures] + "device_destinations", # set of destinations +)) + + +def process_rows_for_federation(federation_sender, rows): + """Parse a list of rows from the federation stream and them send them out. + + Args: + federation_sender (TransactionQueue) + rows (list(FederationStreamRow)) + """ + + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. + + buff = BufferedToSend( + presence={}, + keyed_edus={}, + edus={}, + failures={}, + device_destinations=set(), + ) + + # Parse the rows in the stream and add to the buffer + for row in rows: + RowType = TypeToRow[row.type] + parsed_row = RowType.from_data(row.data) + parsed_row.add_to_buffer(buff) + + # We've finished collecting, send everything off + for destination, states in buff.presence.iteritems(): + federation_sender.send_presence(destination, states) + + for destination, edu_map in buff.keyed_edus.iteritems(): + for key, edu in edu_map.items(): + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in buff.edus.iteritems(): + for edu in edu_list: + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in buff.failures.iteritems(): + for failure in failure_list: + federation_sender.send_failure(destination, failure) + + for destination in buff.device_destinations: + federation_sender.send_device_messages(destination) From d4d176e5d0d130763a5379b317d3d3d039055ba4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:51:28 +0100 Subject: [PATCH 13/20] Add logging --- synapse/federation/send_queue.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 8a6392c697..867cba0cf1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -38,6 +38,10 @@ import synapse.metrics from blist import sorteddict from collections import namedtuple +import logging + +logger = logging.getLogger(__name__) + metrics = synapse.metrics.get_metrics_for(__name__) @@ -480,6 +484,10 @@ def process_rows_for_federation(federation_sender, rows): # Parse the rows in the stream and add to the buffer for row in rows: + if row.type not in TypeToRow: + logger.error("Unrecognized federation row type %r", row.type) + continue + RowType = TypeToRow[row.type] parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) From a828a64b754322f8a1483ec5256ab925039a6e39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:52:57 +0100 Subject: [PATCH 14/20] Comment --- synapse/federation/send_queue.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 867cba0cf1..c26da7acf8 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -312,16 +312,29 @@ class FederationRemoteSendQueue(object): class BaseFederationRow(object): - TypeId = None + """Base class for rows to be sent in the federation stream. + + Specifies how to identify, serialize and deserialize the different types. + """ + + TypeId = None # Unique string that ids the type. Must be overriden in sub classes. @staticmethod def from_data(data): """Parse the data from the federation stream into a row. + + Args: + data: The value of ``data`` from FederationStreamRow.data, type + depends on the type of stream """ raise NotImplementedError() def to_data(self): - """Serialize this row to be sent over the federation stream + """Serialize this row to be sent over the federation stream. + + Returns: + The value to be sent in FederationStreamRow.data. The type depends + on the type of stream. """ raise NotImplementedError() From 2a3e822f4494e42c1b118c2fa0132b3a2f13bbfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 13:47:04 +0100 Subject: [PATCH 15/20] Comment --- synapse/storage/state.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 86e5fdb76b..acd69944c4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -71,6 +71,15 @@ class StateStore(SQLBaseStore): @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): + """Get the current state event ids for a room based on the + current_state_events table. + + Args: + room_id (str) + + Returns: + deferred: dict of (type, state_key) -> event_id + """ def _get_current_state_ids_txn(txn): txn.execute( """SELECT type, state_key, event_id FROM current_state_events From ab904caf3324de82c338268984c979d66f00aed9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 10:02:17 +0100 Subject: [PATCH 16/20] Comments --- synapse/federation/send_queue.py | 10 ++++++---- synapse/replication/tcp/streams.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index c26da7acf8..657a930497 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -238,6 +238,8 @@ class FederationRemoteSendQueue(object): if from_token > self.pos: from_token = -1 + # list of tuple(int, BaseFederationRow), where the first is the position + # of the federation stream. rows = [] # There should be only one reader, so lets delete everything its @@ -476,14 +478,15 @@ BufferedToSend = namedtuple("BufferedToSend", ( def process_rows_for_federation(federation_sender, rows): - """Parse a list of rows from the federation stream and them send them out. + """Parse a list of rows from the federation stream and put them in the + transaction queue ready for sending to the relevant homeservers. Args: federation_sender (TransactionQueue) - rows (list(FederationStreamRow)) + rows (list(synapse.replication.tcp.streams.FederationStreamRow)) """ - # The federation stream containis a bunch of different types of + # The federation stream contains a bunch of different types of # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. @@ -505,7 +508,6 @@ def process_rows_for_federation(federation_sender, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - # We've finished collecting, send everything off for destination, states in buff.presence.iteritems(): federation_sender.send_presence(destination, states) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 967b459e0e..369d5f2428 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -98,8 +98,8 @@ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( "entity", # str )) FederationStreamRow = namedtuple("FederationStreamRow", ( - "type", # str - "data", # dict + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow )) TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( "user_id", # str From f8434db549acd400e880a1e2583ec2d077d46ebf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 10:03:07 +0100 Subject: [PATCH 17/20] Change name --- synapse/federation/send_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 657a930497..df807e57a6 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -468,7 +468,7 @@ TypeToRow = { } -BufferedToSend = namedtuple("BufferedToSend", ( +ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( "presence", # dict of destination -> [UserPresenceState] "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] @@ -490,7 +490,7 @@ def process_rows_for_federation(federation_sender, rows): # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. - buff = BufferedToSend( + buff = ParsedFederationStreamData( presence={}, keyed_edus={}, edus={}, From 8c5f03cec746a1414eab3a052b583d9053086d87 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 10:07:18 +0100 Subject: [PATCH 18/20] Revert to sending the same data type as before --- synapse/federation/send_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index df807e57a6..95fd20e434 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -447,10 +447,10 @@ class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( @staticmethod def from_data(data): - return DeviceRow(destination=data) + return DeviceRow(destination=data["destination"]) def to_data(self): - return self.destination + return {"destination": self.destination} def add_to_buffer(self, buff): buff.device_destinations.add(self.destination) From 0364d2321023fb68ecafb82ede21fa64334593f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 11:32:05 +0100 Subject: [PATCH 19/20] Up replication ping timeout --- synapse/replication/tcp/protocol.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 19b1ce504f..5770b7125a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -85,6 +85,8 @@ logger = logging.getLogger(__name__) PING_TIME = 5000 +PING_TIMEOUT_MULTIPLIER = 5 +PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER class ConnectionStates(object): @@ -166,7 +168,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): now = self.clock.time_msec() if self.time_we_closed: - if now - self.time_we_closed > PING_TIME * 3: + if now - self.time_we_closed > PING_TIMEOUT_MS: logger.info( "[%s] Failed to close connection gracefully, aborting", self.id() ) @@ -175,7 +177,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): if now - self.last_sent_command >= PING_TIME: self.send_command(PingCommand(now)) - if self.received_ping and now - self.last_received_command > PING_TIME * 3: + if self.received_ping and now - self.last_received_command > PING_TIMEOUT_MS: logger.info( "[%s] Connection hasn't received command in %r ms. Closing.", self.id(), now - self.last_received_command From 0018491af213e19ae73af4e84e3570762dc83c7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 12:44:17 +0100 Subject: [PATCH 20/20] Rename variable --- synapse/federation/send_queue.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 95fd20e434..748548bbe2 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -477,12 +477,12 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( )) -def process_rows_for_federation(federation_sender, rows): +def process_rows_for_federation(transaction_queue, rows): """Parse a list of rows from the federation stream and put them in the transaction queue ready for sending to the relevant homeservers. Args: - federation_sender (TransactionQueue) + transaction_queue (TransactionQueue) rows (list(synapse.replication.tcp.streams.FederationStreamRow)) """ @@ -509,23 +509,23 @@ def process_rows_for_federation(federation_sender, rows): parsed_row.add_to_buffer(buff) for destination, states in buff.presence.iteritems(): - federation_sender.send_presence(destination, states) + transaction_queue.send_presence(destination, states) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): - federation_sender.send_edu( + transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=key, ) for destination, edu_list in buff.edus.iteritems(): for edu in edu_list: - federation_sender.send_edu( + transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=None, ) for destination, failure_list in buff.failures.iteritems(): for failure in failure_list: - federation_sender.send_failure(destination, failure) + transaction_queue.send_failure(destination, failure) for destination in buff.device_destinations: - federation_sender.send_device_messages(destination) + transaction_queue.send_device_messages(destination)