From 4c3eb14d6884cea186614de501179c2dc8292c90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 31 Oct 2016 16:07:45 +0000 Subject: [PATCH 01/33] Increase batching of sent transaction inserts This should further reduce the number of individual inserts, transactions and updates that are required for keeping sent_transactions up to date. --- synapse/storage/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 5055c04b24..232ccb96e1 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -62,7 +62,7 @@ class TransactionStore(SQLBaseStore): self.last_transaction = {} reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - self._clock.looping_call(self._persist_in_mem_txns, 1000) + self._clock.looping_call(self._persist_in_mem_txns, 10 * 1000) self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) From f09d2b692fded66f0e53b45c0cb5cec5544e4efa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 31 Oct 2016 17:08:41 +0000 Subject: [PATCH 02/33] Removed unused stuff --- synapse/storage/transactions.py | 166 +------------------------------- 1 file changed, 3 insertions(+), 163 deletions(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 232ccb96e1..59ece7b811 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,13 +16,12 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached -from twisted.internet import defer, reactor +from twisted.internet import defer from canonicaljson import encode_canonical_json from collections import namedtuple -import itertools import logging import ujson as json @@ -47,25 +46,6 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - def __init__(self, hs): - super(TransactionStore, self).__init__(hs) - - # New transactions that are currently in flights - self.inflight_transactions = {} - - # Newly delievered transactions that *weren't* persisted while in flight - self.new_delivered_transactions = {} - - # Newly delivered transactions that *were* persisted while in flight - self.update_delivered_transactions = {} - - self.last_transaction = {} - - reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - self._clock.looping_call(self._persist_in_mem_txns, 10 * 1000) - - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) - def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -148,46 +128,7 @@ class TransactionStore(SQLBaseStore): Returns: list: A list of previous transaction ids. """ - - auto_id = self._transaction_id_gen.get_next() - - txn_row = _TransactionRow( - id=auto_id, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None, - ) - - self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - - prev_txn = self.last_transaction.get(destination) - if prev_txn: - return defer.succeed(prev_txn) - else: - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) - - def _get_prevs_txn(self, txn, destination): - # First we find out what the prev_txns should be. - # Since we know that we are only sending one transaction at a time, - # we can simply take the last one. - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ?" - " ORDER BY id DESC LIMIT 1" - ) - - txn.execute(query, (destination,)) - results = self.cursor_to_dict(txn) - - prev_txns = [r["transaction_id"] for r in results] - - return prev_txns + return defer.succeed([]) def delivered_txn(self, transaction_id, destination, code, response_dict): """Persists the response for an outgoing transaction. @@ -198,52 +139,7 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - - txn_row = self.inflight_transactions.get( - destination, {} - ).pop(transaction_id, None) - - self.last_transaction[destination] = transaction_id - - if txn_row: - d = self.new_delivered_transactions.setdefault(destination, {}) - d[transaction_id] = txn_row._replace( - response_code=code, - response_json=None, # For now, don't persist response - ) - else: - d = self.update_delivered_transactions.setdefault(destination, {}) - # For now, don't persist response - d[transaction_id] = _UpdateTransactionRow(code, None) - - def get_transactions_after(self, transaction_id, destination): - """Get all transactions after a given local transaction_id. - - Args: - transaction_id (str) - destination (str) - - Returns: - list: A list of dicts - """ - return self.runInteraction( - "get_transactions_after", - self._get_transactions_after, transaction_id, destination - ) - - def _get_transactions_after(self, txn, transaction_id, destination): - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ? AND id >" - " (" - " SELECT id FROM sent_transactions" - " WHERE transaction_id = ? AND destination = ?" - " )" - ) - - txn.execute(query, (destination, transaction_id, destination)) - - return self.cursor_to_dict(txn) + pass @cached(max_entries=10000) def get_destination_retry_timings(self, destination): @@ -338,59 +234,3 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) - - @defer.inlineCallbacks - def _persist_in_mem_txns(self): - try: - inflight = self.inflight_transactions - new_delivered = self.new_delivered_transactions - update_delivered = self.update_delivered_transactions - - self.inflight_transactions = {} - self.new_delivered_transactions = {} - self.update_delivered_transactions = {} - - full_rows = [ - row._asdict() - for txn_map in itertools.chain(inflight.values(), new_delivered.values()) - for row in txn_map.values() - ] - - def f(txn): - if full_rows: - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) - - for dest, txn_map in update_delivered.items(): - for txn_id, update_row in txn_map.items(): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": txn_id, - "destination": dest, - }, - updatevalues={ - "response_code": update_row.response_code, - "response_json": None, # For now, don't persist response - } - ) - - if full_rows or update_delivered: - yield self.runInteraction("_persist_in_mem_txns", f) - except: - logger.exception("Failed to persist transactions!") - - def _cleanup_transactions(self): - now = self._clock.time_msec() - month_ago = now - 30 * 24 * 60 * 60 * 1000 - six_hours_ago = now - 6 * 60 * 60 * 1000 - - def _cleanup_transactions_txn(txn): - txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) - txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,)) - - return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) From 760469c81238f86a4ae6dbb485c9c2f4930a3483 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Nov 2016 11:42:08 +0000 Subject: [PATCH 03/33] Continue to clean up received_transactions --- synapse/storage/transactions.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 59ece7b811..adab520c78 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -46,6 +46,11 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -234,3 +239,12 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + def _cleanup_transactions(self): + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + return self.runInteraction("_cleanup_transactions", _cleanup_transactions_txn) From b1c27975d06ff23481a8b1ae0b384a9b5dedd04e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 2 Nov 2016 11:29:25 +0000 Subject: [PATCH 04/33] Set CORs headers on responses from the media repo --- synapse/http/server.py | 23 ++++++++++++++++----- synapse/rest/media/v1/download_resource.py | 3 ++- synapse/rest/media/v1/thumbnail_resource.py | 3 ++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 168e53ce0c..14715878c5 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -392,17 +392,30 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),)) if send_cors: - request.setHeader("Access-Control-Allow-Origin", "*") - request.setHeader("Access-Control-Allow-Methods", - "GET, POST, PUT, DELETE, OPTIONS") - request.setHeader("Access-Control-Allow-Headers", - "Origin, X-Requested-With, Content-Type, Accept") + set_cors_headers(request) request.write(json_bytes) finish_request(request) return NOT_DONE_YET +def set_cors_headers(request): + """Set the CORs headers so that javascript running in a web browsers can + use this API + + Args: + request (twisted.web.http.Request): The http request to add CORs to. + """ + request.setHeader("Access-Control-Allow-Origin", "*") + request.setHeader( + "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS" + ) + request.setHeader( + "Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept" + ) + + def finish_request(request): """ Finish writing the response to the request. diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index a45ee9483e..dfb87ffd15 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -15,7 +15,7 @@ from ._base import parse_media_id, respond_with_file, respond_404 from twisted.web.resource import Resource -from synapse.http.server import request_handler +from synapse.http.server import request_handler, set_cors_headers from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -45,6 +45,7 @@ class DownloadResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): + set_cors_headers(request) request.setHeader( "Content-Security-Policy", "default-src 'none';" diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 0b9e1de1a7..d8f54adc99 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -17,7 +17,7 @@ from ._base import parse_media_id, respond_404, respond_with_file from twisted.web.resource import Resource from synapse.http.servlet import parse_string, parse_integer -from synapse.http.server import request_handler +from synapse.http.server import request_handler, set_cors_headers from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -48,6 +48,7 @@ class ThumbnailResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): + set_cors_headers(request) server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width") height = parse_integer(request, "height") From 9084720993c3481142a256729aa49de592ba21eb Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 3 Nov 2016 10:42:14 +0000 Subject: [PATCH 05/33] Don't error on non-ascii passwords --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3635521230..3851b35889 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -653,7 +653,7 @@ class AuthHandler(BaseHandler): Returns: Hashed password (str). """ - return bcrypt.hashpw(password + self.hs.config.password_pepper, + return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, bcrypt.gensalt(self.bcrypt_rounds)) def validate_hash(self, password, stored_hash): From 8fd4d9129f17e9df42302269e4f73ae8417dde23 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Nov 2016 14:59:59 +0000 Subject: [PATCH 06/33] Replace postgres GIN with GIST This is because GIN can be slow to write too, especially when the table gets large. --- synapse/storage/prepare_database.py | 2 +- .../schema/delta/38/postgres_fts_gist.sql | 17 ++++++++++++ synapse/storage/search.py | 27 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/38/postgres_fts_gist.sql diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index d2c0aebe48..6576a30098 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 37 +SCHEMA_VERSION = 38 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql new file mode 100644 index 0000000000..f090a7b75a --- /dev/null +++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_postgres_gist', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 12941d1775..eae90c2fd8 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,6 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" + EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" def __init__(self, hs): super(SearchStore, self).__init__(hs) @@ -41,6 +42,10 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) + self.register_background_update_handler( + self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, + self._background_reindex_gist_search + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -139,6 +144,28 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) + @defer.inlineCallbacks + def _background_reindex_gist_search(self, progress, batch_size): + def create_index(conn): + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() + + # We create with NULLS FIRST so that when we search *backwards* + # we get the ones with non null origin_server_ts *first* + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist" + " ON event_search USING GIST (vector)" + ) + c.execute("DROP INDEX event_search_fts_idx") + conn.set_session(autocommit=False) + + if isinstance(self.database_engine, PostgresEngine): + yield self.runWithConnection(create_index) + + yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME) + defer.returnValue(1) + @defer.inlineCallbacks def _background_reindex_search_order(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] From 64c65669804fa137fe7b7be3a589349b50348a3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Nov 2016 15:04:32 +0000 Subject: [PATCH 07/33] Remove spurious comment --- synapse/storage/search.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index eae90c2fd8..8f2b3c4435 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -151,13 +151,13 @@ class SearchStore(BackgroundUpdateStore): conn.set_session(autocommit=True) c = conn.cursor() - # We create with NULLS FIRST so that when we search *backwards* - # we get the ones with non null origin_server_ts *first* c.execute( "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist" " ON event_search USING GIST (vector)" ) + c.execute("DROP INDEX event_search_fts_idx") + conn.set_session(autocommit=False) if isinstance(self.database_engine, PostgresEngine): From c1b077cd194d009470ad081311e2729769c9750a Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 3 Nov 2016 16:27:10 +0000 Subject: [PATCH 08/33] Now we have new-style metrics don't bother exporting legacy-named process ones --- synapse/metrics/process_collector.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 0e95582368..4df3a2fa24 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -110,22 +110,7 @@ def _process_fds(): def register_process_collector(process_metrics): - # Legacy synapse-invented metric names - - resource_metrics = process_metrics.make_subspace("resource") - - resource_metrics.register_collector(update_resource_metrics) - - # msecs - resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000) - resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000) - - # kilobytes - resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * 1024) - - process_metrics.register_callback("fds", _process_fds, labels=["type"]) - - # New prometheus-standard metric names + process_metrics.register_collector(update_resource_metrics) if HAVE_PROC_SELF_STAT: process_metrics.register_callback( From 93ebeb2aa8bf32e0c0f63ff4c933a5e2d43cc29a Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 3 Nov 2016 16:37:09 +0000 Subject: [PATCH 09/33] Remove now-unused 'resource' import --- synapse/metrics/process_collector.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 4df3a2fa24..f812a80e95 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -13,12 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Because otherwise 'resource' collides with synapse.metrics.resource -from __future__ import absolute_import - import os import stat -from resource import getrusage, RUSAGE_SELF TICKS_PER_SEC = 100 @@ -49,7 +45,6 @@ STAT_FIELDS = { } -rusage = None stats = {} fd_counts = None @@ -65,9 +60,6 @@ if HAVE_PROC_STAT: def update_resource_metrics(): - global rusage - rusage = getrusage(RUSAGE_SELF) - if HAVE_PROC_SELF_STAT: global stats with open("/proc/self/stat") as s: From 5219f7e06039aa51b5e578917f7f2609b9944fda Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 3 Nov 2016 16:41:32 +0000 Subject: [PATCH 10/33] Since we don't export per-filetype fd counts any more, delete all the code related to that too --- synapse/metrics/process_collector.py | 40 +++------------------------- 1 file changed, 4 insertions(+), 36 deletions(-) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index f812a80e95..6fec3de399 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -14,7 +14,6 @@ # limitations under the License. import os -import stat TICKS_PER_SEC = 100 @@ -25,16 +24,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits") HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd") -TYPES = { - stat.S_IFSOCK: "SOCK", - stat.S_IFLNK: "LNK", - stat.S_IFREG: "REG", - stat.S_IFBLK: "BLK", - stat.S_IFDIR: "DIR", - stat.S_IFCHR: "CHR", - stat.S_IFIFO: "FIFO", -} - # Field indexes from /proc/self/stat, taken from the proc(5) manpage STAT_FIELDS = { "utime": 14, @@ -46,7 +35,6 @@ STAT_FIELDS = { stats = {} -fd_counts = None # In order to report process_start_time_seconds we need to know the # machine's boot time, because the value in /proc/self/stat is relative to @@ -72,33 +60,13 @@ def update_resource_metrics(): # we've lost the first two fields in PID and COMMAND above stats[name] = int(raw_stats[index - 3]) - global fd_counts - fd_counts = _process_fds() - - -def _process_fds(): - counts = {(k,): 0 for k in TYPES.values()} - counts[("other",)] = 0 +def _count_fds(): # Not every OS will have a /proc/self/fd directory if not HAVE_PROC_SELF_FD: - return counts + return 0 - for fd in os.listdir("/proc/self/fd"): - try: - s = os.stat("/proc/self/fd/%s" % (fd)) - fmt = stat.S_IFMT(s.st_mode) - if fmt in TYPES: - t = TYPES[fmt] - else: - t = "other" - - counts[(t,)] += 1 - except OSError: - # the dirh itself used by listdir() is usually missing by now - pass - - return counts + return len(os.listdir("/proc/self/fd")) def register_process_collector(process_metrics): @@ -135,7 +103,7 @@ def register_process_collector(process_metrics): if HAVE_PROC_SELF_FD: process_metrics.register_callback( "open_fds", - lambda: sum(fd_counts.values()) + lambda: _count_fds() ) if HAVE_PROC_SELF_LIMITS: From 2938a00825f34f271130a4c50fdb402f2dff8ead Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 3 Nov 2016 17:03:52 +0000 Subject: [PATCH 11/33] Rename the python-specific metrics now the docs claim that we have done --- synapse/metrics/__init__.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 7041da25ce..2265e6e8d6 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -111,18 +111,20 @@ def render_all(): return "\n".join(strs) -reactor_metrics = get_metrics_for("reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +register_process_collector(get_metrics_for("process")) -gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) -reactor_metrics.register_callback( +python_metrics = get_metrics_for("python") + +gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) +gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) +python_metrics.register_callback( "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] ) -register_process_collector(get_metrics_for("process")) +reactor_metrics = get_metrics_for("python.twisted.reactor") +tick_time = reactor_metrics.register_distribution("tick_time") +pending_calls_metric = reactor_metrics.register_distribution("pending_calls") def runUntilCurrentTimer(func): From 89e3e39d52ba6caa58c0a9d9dbb4e6d0461aa1f2 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 3 Nov 2016 17:04:13 +0000 Subject: [PATCH 12/33] Fix copypasto error in metric rename table in docs --- docs/metrics-howto.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 7aa4757a35..ca10799b00 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -51,9 +51,9 @@ python_gc_counts reactor_gc_counts The twisted-specific reactor metrics have been renamed. -==================================== ================= +==================================== ===================== New name Old name ------------------------------------- ----------------- -python_twisted_reactor_pending_calls reactor_tick_time +------------------------------------ --------------------- +python_twisted_reactor_pending_calls reactor_pending_calls python_twisted_reactor_tick_time reactor_tick_time -==================================== ================= +==================================== ===================== From a3f6576084fc2d0b7e5b95bffadc532d72145111 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 4 Nov 2016 10:48:20 +0000 Subject: [PATCH 13/33] Remove unused but buggy function --- synapse/storage/pusher.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8f5f8f24a9..5d1cb72a32 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -137,17 +137,7 @@ class PusherStore(SQLBaseStore): @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): - result = yield self._simple_select_many_batch( - table='pushers', - keyvalues={ - 'user_name': 'user_id', - }, - retcol='user_name', - desc='get_if_user_has_pusher', - allow_none=True, - ) - - defer.returnValue(bool(result)) + raise NotImplementedError() @cachedList(cached_method_name="get_if_user_has_pusher", list_name="user_ids", num_args=1, inlineCallbacks=True) From 63772443e6d03afeafffabee78c27b02f996faa3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 4 Nov 2016 10:53:42 +0000 Subject: [PATCH 14/33] Comment --- synapse/storage/pusher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 5d1cb72a32..8cc9f0353b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -137,6 +137,7 @@ class PusherStore(SQLBaseStore): @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): + # This only exists for the cachedList decorator raise NotImplementedError() @cachedList(cached_method_name="get_if_user_has_pusher", From c6bbad109bfa97bc31ae76d6f4d83111ffa04ec4 Mon Sep 17 00:00:00 2001 From: Euan Kemp Date: Sun, 6 Nov 2016 17:02:25 -0800 Subject: [PATCH 15/33] default config: blacklist more internal ips --- synapse/config/repository.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 8810079848..2c6f57168e 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -167,6 +167,8 @@ class ContentRepositoryConfig(Config): # - '10.0.0.0/8' # - '172.16.0.0/12' # - '192.168.0.0/16' + # - '100.64.0.0/10' + # - '169.254.0.0/16' # # List of IP address CIDR ranges that the URL preview spider is allowed # to access even if they are specified in url_preview_ip_range_blacklist. From eeda4e618c0c325e43e82a1d3ac146fbe419b446 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Nov 2016 11:02:29 +0000 Subject: [PATCH 16/33] Limit the number of prev_events of new events --- synapse/handlers/message.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index abfa8c65a4..3eca46df86 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,6 +34,7 @@ from ._base import BaseHandler from canonicaljson import encode_canonical_json import logging +import random logger = logging.getLogger(__name__) @@ -415,6 +416,18 @@ class MessageHandler(BaseHandler): builder.room_id, ) + # We want to limit the max number of prev events we point to in our + # new event + if len(latest_ret) > 10: + # Sort by reverse depth, so we point to the most recent. + latest_ret.sort(key=lambda a: -a[2]) + new_latest_ret = latest_ret[:5] + + # We also randomly point to some of the older events, to make + # sure that we don't completely ignore the older events. + new_latest_ret.extend(random.sample(latest_ret, 5)) + latest_ret = new_latest_ret + if latest_ret: depth = max([d for _, _, d in latest_ret]) + 1 else: From 24772ba56eb2959404558fb373e3182668838298 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Nov 2016 11:07:18 +0000 Subject: [PATCH 17/33] Respect use_frozen_dicts option in workers --- synapse/app/appservice.py | 4 ++++ synapse/app/client_reader.py | 4 ++++ synapse/app/federation_reader.py | 4 ++++ synapse/app/media_repository.py | 4 ++++ synapse/app/pusher.py | 4 ++++ synapse/app/synchrotron.py | 2 ++ 6 files changed, 22 insertions(+) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 4e62a84b28..dd9ee406a1 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -34,6 +34,8 @@ from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -151,6 +153,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) if config.notify_appservices: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 9fccc73db3..0086a2977e 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -41,6 +41,8 @@ from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.crypto import context_factory +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -165,6 +167,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 1f5ae1937e..b5f59a9931 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -39,6 +39,8 @@ from synapse.api.urls import FEDERATION_PREFIX from synapse.federation.transport.server import TransportLayerServer from synapse.crypto import context_factory +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -156,6 +158,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 6e5ec01c6c..44c19a1bef 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -41,6 +41,8 @@ from synapse.api.urls import ( ) from synapse.crypto import context_factory +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -162,6 +164,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 1a6f5507a9..a0e765c54f 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -36,6 +36,8 @@ from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -239,6 +241,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + if config.start_pushers: sys.stderr.write( "\nThe pushers must be disabled in the main synapse process" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 64b209ffe6..bf1b995dc2 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -446,6 +446,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) ss = SynchrotronServer( From a4632783fba26aaaedd0b9777877cb2ec8b48752 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Nov 2016 11:20:26 +0000 Subject: [PATCH 18/33] Sample correctly --- synapse/handlers/message.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3eca46df86..81df45177a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -425,7 +425,9 @@ class MessageHandler(BaseHandler): # We also randomly point to some of the older events, to make # sure that we don't completely ignore the older events. - new_latest_ret.extend(random.sample(latest_ret, 5)) + if latest_ret[5:]: + sample_size = min(5, len(latest_ret[5:])) + new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) latest_ret = new_latest_ret if latest_ret: From ac507e7ab8eb4dd02c9229d312d1f278a5b041b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Nov 2016 17:23:28 +0000 Subject: [PATCH 19/33] Don't assume providers raise ConfigError's --- synapse/config/password_auth_providers.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index f6d9bb1c62..1f438d2bb3 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError import importlib @@ -39,7 +39,12 @@ class PasswordAuthProviderConfig(Config): module = importlib.import_module(module) provider_class = getattr(module, clz) - provider_config = provider_class.parse_config(provider["config"]) + try: + provider_config = provider_class.parse_config(provider["config"]) + except Exception as e: + raise ConfigError( + "Failed to parse config for %r: %r" % (provider['module'], e) + ) self.password_providers.append((provider_class, provider_config)) def default_config(self, **kwargs): From 2771447c29402e1f26bb45bdb730bdd0fe02682f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 10 Nov 2016 14:49:26 +0000 Subject: [PATCH 20/33] Store Promise instead of Response for HTTP API transactions This fixes a race whereby: - User hits an endpoint. - No cached transaction so executes main code. - User hits same endpoint. - No cache transaction so executes main code. - Main code finishes executing and caches response and returns. - Main code finishes executing and caches response and returns. This race is common in the wild when Synapse is struggling under load. This commit fixes the race by: - User hits an endpoint. - Caches the promise to execute the main code and executes main code. - User hits same endpoint. - Yields on the same promise as the first request. - Main code finishes executing and returns, unblocking both requests. --- synapse/rest/client/v1/base.py | 4 +- synapse/rest/client/v1/room.py | 82 ++++++++++---------- synapse/rest/client/v1/transactions.py | 52 ++++--------- synapse/rest/client/v2_alpha/sendtodevice.py | 18 +++-- 4 files changed, 68 insertions(+), 88 deletions(-) diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py index c2a8447860..22c740c30c 100644 --- a/synapse/rest/client/v1/base.py +++ b/synapse/rest/client/v1/base.py @@ -18,7 +18,7 @@ from synapse.http.servlet import RestServlet from synapse.api.urls import CLIENT_PREFIX -from .transactions import HttpTransactionStore +from .transactions import HttpTransactionCache import re import logging @@ -59,4 +59,4 @@ class ClientV1RestServlet(RestServlet): self.hs = hs self.builder_factory = hs.get_event_builder_factory() self.auth = hs.get_v1auth() - self.txns = HttpTransactionStore() + self.txns = HttpTransactionCache() diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 010fbc7c32..2e919de9f3 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -56,15 +56,15 @@ class RoomCreateRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass - response = yield self.on_POST(request) - - self.txns.store_client_transaction(request, txn_id, response) + res_deferred = self.on_POST(request) + self.txns.store_client_transaction(request, txn_id, res_deferred) + response = yield res_deferred defer.returnValue(response) @defer.inlineCallbacks @@ -217,15 +217,15 @@ class RoomSendEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass - - response = yield self.on_POST(request, room_id, event_type, txn_id) - - self.txns.store_client_transaction(request, txn_id, response) + + res_deferred = self.on_POST(request, room_id, event_type, txn_id) + self.txns.store_client_transaction(request, txn_id, res_deferred) + response = yield res_deferred defer.returnValue(response) @@ -286,15 +286,15 @@ class JoinRoomAliasServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_identifier, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass - - response = yield self.on_POST(request, room_identifier, txn_id) - - self.txns.store_client_transaction(request, txn_id, response) + + res_deferred = self.on_POST(request, room_identifier, txn_id) + self.txns.store_client_transaction(request, txn_id, res_deferred) + response = yield res_deferred defer.returnValue(response) @@ -540,17 +540,15 @@ class RoomForgetRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass - - response = yield self.on_POST( - request, room_id, txn_id - ) - - self.txns.store_client_transaction(request, txn_id, response) + + res_deferred = self.on_POST(request, room_id, txn_id) + self.txns.store_client_transaction(request, txn_id, res_deferred) + response = yield res_deferred defer.returnValue(response) @@ -626,17 +624,15 @@ class RoomMembershipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, membership_action, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass - response = yield self.on_POST( - request, room_id, membership_action, txn_id - ) - - self.txns.store_client_transaction(request, txn_id, response) + res_deferred = self.on_POST(request, room_id, membership_action, txn_id) + self.txns.store_client_transaction(request, txn_id, res_deferred) + response = yield res_deferred defer.returnValue(response) @@ -672,15 +668,15 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, event_id, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass - response = yield self.on_POST(request, room_id, event_id, txn_id) - - self.txns.store_client_transaction(request, txn_id, response) + res_deferred = self.on_POST(request, room_id, event_id, txn_id) + self.txns.store_client_transaction(request, txn_id, res_deferred) + response = yield res_deferred defer.returnValue(response) diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py index 2f2c9d0881..f5012c5f59 100644 --- a/synapse/rest/client/v1/transactions.py +++ b/synapse/rest/client/v1/transactions.py @@ -22,57 +22,35 @@ from synapse.api.auth import get_access_token_from_request logger = logging.getLogger(__name__) -# FIXME: elsewhere we use FooStore to indicate something in the storage layer... -class HttpTransactionStore(object): +class HttpTransactionCache(object): def __init__(self): - # { key : (txn_id, response) } + # { key : (txn_id, response_deferred) } self.transactions = {} - def get_response(self, key, txn_id): - """Retrieve a response for this request. - - Args: - key (str): A transaction-independent key for this request. Usually - this is a combination of the path (without the transaction id) - and the user's access token. - txn_id (str): The transaction ID for this request - Returns: - A tuple of (HTTP response code, response content) or None. - """ + def _get_response(self, key, txn_id): try: - logger.debug("get_response TxnId: %s", txn_id) - (last_txn_id, response) = self.transactions[key] + (last_txn_id, response_deferred) = self.transactions[key] if txn_id == last_txn_id: logger.info("get_response: Returning a response for %s", txn_id) - return response + return response_deferred except KeyError: pass return None - def store_response(self, key, txn_id, response): - """Stores an HTTP response tuple. + def _store_response(self, key, txn_id, response_deferred): + self.transactions[key] = (txn_id, response_deferred) - Args: - key (str): A transaction-independent key for this request. Usually - this is a combination of the path (without the transaction id) - and the user's access token. - txn_id (str): The transaction ID for this request. - response (tuple): A tuple of (HTTP response code, response content) - """ - logger.debug("store_response TxnId: %s", txn_id) - self.transactions[key] = (txn_id, response) - - def store_client_transaction(self, request, txn_id, response): - """Stores the request/response pair of an HTTP transaction. + def store_client_transaction(self, request, txn_id, response_deferred): + """Stores the request/Promise pair of an HTTP transaction. Args: request (twisted.web.http.Request): The twisted HTTP request. This request must have the transaction ID as the last path segment. - response (tuple): A tuple of (response code, response dict) + response_deferred (Promise): A tuple of (response code, response dict) txn_id (str): The transaction ID for this request. """ - self.store_response(self._get_key(request), txn_id, response) + self._store_response(self._get_key(request), txn_id, response_deferred) def get_client_transaction(self, request, txn_id): """Retrieves a stored response if there was one. @@ -82,14 +60,14 @@ class HttpTransactionStore(object): request must have the transaction ID as the last path segment. txn_id (str): The transaction ID for this request. Returns: - The response tuple. + Promise: Resolves to the response tuple. Raises: KeyError if the transaction was not found. """ - response = self.get_response(self._get_key(request), txn_id) - if response is None: + response_deferred = self._get_response(self._get_key(request), txn_id) + if response_deferred is None: raise KeyError("Transaction not found.") - return response + return response_deferred def _get_key(self, request): token = get_access_token_from_request(request) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 5975164b37..7c800ca895 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,7 +19,7 @@ from twisted.internet import defer from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request -from synapse.rest.client.v1.transactions import HttpTransactionStore +from synapse.rest.client.v1.transactions import HttpTransactionCache from ._base import client_v2_patterns @@ -40,18 +40,25 @@ class SendToDeviceRestServlet(servlet.RestServlet): super(SendToDeviceRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.txns = HttpTransactionStore() + self.txns = HttpTransactionCache() self.device_message_handler = hs.get_device_message_handler() @defer.inlineCallbacks def on_PUT(self, request, message_type, txn_id): try: - defer.returnValue( - self.txns.get_client_transaction(request, txn_id) - ) + res_deferred = self.txns.get_client_transaction(request, txn_id) + res = yield res_deferred + defer.returnValue(res) except KeyError: pass + + res_deferred = self._put(request, message_type, txn_id) + self.txns.store_client_transaction(request, txn_id, res_deferred) + res = yield res_deferred + defer.returnValue(res) + @defer.inlineCallbacks + def _put(self, request, message_type, txn_id): requester = yield self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) @@ -63,7 +70,6 @@ class SendToDeviceRestServlet(servlet.RestServlet): ) response = (200, {}) - self.txns.store_client_transaction(request, txn_id, response) defer.returnValue(response) From 8a8ad46f48525812c983dadcd9e2757cebaaa6cf Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 10 Nov 2016 15:22:11 +0000 Subject: [PATCH 21/33] Flake8 --- synapse/rest/client/v1/room.py | 6 +++--- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2e919de9f3..caa779ffaf 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -222,7 +222,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): defer.returnValue(res) except KeyError: pass - + res_deferred = self.on_POST(request, room_id, event_type, txn_id) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred @@ -291,7 +291,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): defer.returnValue(res) except KeyError: pass - + res_deferred = self.on_POST(request, room_identifier, txn_id) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred @@ -545,7 +545,7 @@ class RoomForgetRestServlet(ClientV1RestServlet): defer.returnValue(res) except KeyError: pass - + res_deferred = self.on_POST(request, room_id, txn_id) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 7c800ca895..4d13e793ac 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -51,7 +51,7 @@ class SendToDeviceRestServlet(servlet.RestServlet): defer.returnValue(res) except KeyError: pass - + res_deferred = self._put(request, message_type, txn_id) self.txns.store_client_transaction(request, txn_id, res_deferred) res = yield res_deferred From c7daf3136c8ae024187cd50530d3ee90b1385b13 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 11 Nov 2016 14:13:32 +0000 Subject: [PATCH 22/33] Use observable deferreds because they are sane --- synapse/rest/client/v2_alpha/sendtodevice.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 4d13e793ac..31167ba535 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -20,6 +20,7 @@ from twisted.internet import defer from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.v1.transactions import HttpTransactionCache +from synapse.util.async import ObservableDeferred from ._base import client_v2_patterns @@ -47,14 +48,14 @@ class SendToDeviceRestServlet(servlet.RestServlet): def on_PUT(self, request, message_type, txn_id): try: res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self._put(request, message_type, txn_id) + res_deferred = ObservableDeferred(self._put(request, message_type, txn_id)) self.txns.store_client_transaction(request, txn_id, res_deferred) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) @defer.inlineCallbacks From 42c43cfafd3e0471e4e0f6fb05e951290783ba1f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 11 Nov 2016 14:54:10 +0000 Subject: [PATCH 23/33] Use ObservableDeferreds instead of Deferreds as they behave as intended --- synapse/rest/client/v1/room.py | 39 +++++++++++++------------- synapse/rest/client/v1/transactions.py | 22 +++++++-------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index caa779ffaf..47c4b84b73 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -22,6 +22,7 @@ from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias +from synapse.util.async import ObservableDeferred from synapse.events.utils import serialize_event, format_event_for_client_v2 from synapse.http.servlet import ( parse_json_object_from_request, parse_string, parse_integer @@ -57,14 +58,14 @@ class RoomCreateRestServlet(ClientV1RestServlet): def on_PUT(self, request, txn_id): try: res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self.on_POST(request) + res_deferred = ObservableDeferred(self.on_POST(request)) self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred + response = yield res_deferred.observe() defer.returnValue(response) @defer.inlineCallbacks @@ -218,14 +219,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet): def on_PUT(self, request, room_id, event_type, txn_id): try: res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self.on_POST(request, room_id, event_type, txn_id) + res_deferred = ObservableDeferred(self.on_POST(request, room_id, event_type, txn_id)) self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred + response = yield res_deferred.observe() defer.returnValue(response) @@ -287,14 +288,14 @@ class JoinRoomAliasServlet(ClientV1RestServlet): def on_PUT(self, request, room_identifier, txn_id): try: res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self.on_POST(request, room_identifier, txn_id) + res_deferred = ObservableDeferred(self.on_POST(request, room_identifier, txn_id)) self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred + response = yield res_deferred.observe() defer.returnValue(response) @@ -541,14 +542,14 @@ class RoomForgetRestServlet(ClientV1RestServlet): def on_PUT(self, request, room_id, txn_id): try: res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self.on_POST(request, room_id, txn_id) + res_deferred = ObservableDeferred(self.on_POST(request, room_id, txn_id)) self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred + response = yield res_deferred.observe() defer.returnValue(response) @@ -624,15 +625,15 @@ class RoomMembershipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, membership_action, txn_id): try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res_deferred = ObservableDeferred(self.txns.get_client_transaction(request, txn_id)) + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self.on_POST(request, room_id, membership_action, txn_id) + res_deferred = ObservableDeffself.on_POST(request, room_id, membership_action, txn_id) self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred + response = yield res_deferred.observe() defer.returnValue(response) @@ -669,14 +670,14 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): def on_PUT(self, request, room_id, event_id, txn_id): try: res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred + res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = self.on_POST(request, room_id, event_id, txn_id) + res_deferred = ObservableDeferred(self.on_POST(request, room_id, event_id, txn_id)) self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred + response = yield res_deferred.observe() defer.returnValue(response) diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py index f5012c5f59..774430458a 100644 --- a/synapse/rest/client/v1/transactions.py +++ b/synapse/rest/client/v1/transactions.py @@ -25,32 +25,32 @@ logger = logging.getLogger(__name__) class HttpTransactionCache(object): def __init__(self): - # { key : (txn_id, response_deferred) } + # { key : (txn_id, res_observ_defer) } self.transactions = {} def _get_response(self, key, txn_id): try: - (last_txn_id, response_deferred) = self.transactions[key] + (last_txn_id, res_observ_defer) = self.transactions[key] if txn_id == last_txn_id: logger.info("get_response: Returning a response for %s", txn_id) - return response_deferred + return res_observ_defer except KeyError: pass return None - def _store_response(self, key, txn_id, response_deferred): - self.transactions[key] = (txn_id, response_deferred) + def _store_response(self, key, txn_id, res_observ_defer): + self.transactions[key] = (txn_id, res_observ_defer) - def store_client_transaction(self, request, txn_id, response_deferred): + def store_client_transaction(self, request, txn_id, res_observ_defer): """Stores the request/Promise pair of an HTTP transaction. Args: request (twisted.web.http.Request): The twisted HTTP request. This request must have the transaction ID as the last path segment. - response_deferred (Promise): A tuple of (response code, response dict) + res_observ_defer (Promise): A tuple of (response code, response dict) txn_id (str): The transaction ID for this request. """ - self._store_response(self._get_key(request), txn_id, response_deferred) + self._store_response(self._get_key(request), txn_id, res_observ_defer) def get_client_transaction(self, request, txn_id): """Retrieves a stored response if there was one. @@ -64,10 +64,10 @@ class HttpTransactionCache(object): Raises: KeyError if the transaction was not found. """ - response_deferred = self._get_response(self._get_key(request), txn_id) - if response_deferred is None: + res_observ_defer = self._get_response(self._get_key(request), txn_id) + if res_observ_defer is None: raise KeyError("Transaction not found.") - return response_deferred + return res_observ_defer def _get_key(self, request): token = get_access_token_from_request(request) From a88bc67f88bf5b932283372818c7e33bedfbaa0b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 11 Nov 2016 15:02:29 +0000 Subject: [PATCH 24/33] Flake8 and fix whoopsie --- synapse/rest/client/v1/room.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 47c4b84b73..a866615f71 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -224,7 +224,9 @@ class RoomSendEventRestServlet(ClientV1RestServlet): except KeyError: pass - res_deferred = ObservableDeferred(self.on_POST(request, room_id, event_type, txn_id)) + res_deferred = ObservableDeferred( + self.on_POST(request, room_id, event_type, txn_id) + ) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred.observe() defer.returnValue(response) @@ -625,13 +627,13 @@ class RoomMembershipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, membership_action, txn_id): try: - res_deferred = ObservableDeferred(self.txns.get_client_transaction(request, txn_id)) + res_deferred = self.txns.get_client_transaction(request, txn_id) res = yield res_deferred.observe() defer.returnValue(res) except KeyError: pass - res_deferred = ObservableDeffself.on_POST(request, room_id, membership_action, txn_id) + res_deferred = ObservableDeferred(self.on_POST(request, room_id, membership_action, txn_id)) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred.observe() defer.returnValue(response) @@ -675,7 +677,9 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): except KeyError: pass - res_deferred = ObservableDeferred(self.on_POST(request, room_id, event_id, txn_id)) + res_deferred = ObservableDeferred( + self.on_POST(request, room_id, event_id, txn_id) + ) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred.observe() defer.returnValue(response) From f6c48802f5d01cbbc5fbd9ac151b84a80a2e3ac3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 11 Nov 2016 15:08:24 +0000 Subject: [PATCH 25/33] More flake8 --- synapse/rest/client/v1/room.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index a866615f71..7af4077721 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -633,7 +633,9 @@ class RoomMembershipRestServlet(ClientV1RestServlet): except KeyError: pass - res_deferred = ObservableDeferred(self.on_POST(request, room_id, membership_action, txn_id)) + res_deferred = ObservableDeferred( + self.on_POST(request, room_id, membership_action, txn_id) + ) self.txns.store_client_transaction(request, txn_id, res_deferred) response = yield res_deferred.observe() defer.returnValue(response) From 8ecaff51a147948f977e745bace697ffcba6595b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 11 Nov 2016 17:47:03 +0000 Subject: [PATCH 26/33] Review comments --- synapse/rest/client/transactions.py | 85 +++++++++++++++++ synapse/rest/client/v1/base.py | 3 +- synapse/rest/client/v1/room.py | 97 ++++++-------------- synapse/rest/client/v1/transactions.py | 75 --------------- synapse/rest/client/v2_alpha/sendtodevice.py | 17 +--- 5 files changed, 119 insertions(+), 158 deletions(-) create mode 100644 synapse/rest/client/transactions.py delete mode 100644 synapse/rest/client/v1/transactions.py diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py new file mode 100644 index 0000000000..1db972a378 --- /dev/null +++ b/synapse/rest/client/transactions.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module contains logic for storing HTTP PUT transactions. This is used +to ensure idempotency when performing PUTs using the REST API.""" +import logging + +from synapse.api.auth import get_access_token_from_request +from synapse.util.async import ObservableDeferred + +logger = logging.getLogger(__name__) + + +def get_transaction_key(request): + """A helper function which returns a transaction key that can be used + with TransactionCache for idempotent requests. + + Idempotency is based on the returned key being the same for separate + requests to the same endpoint. The key is formed from the HTTP request + path and the access_token for the requesting user. + + Args: + request (twisted.web.http.Request): The incoming request. Must + contain an access_token. + Returns: + str: A transaction key + """ + token = get_access_token_from_request(request) + return request.path + "/" + token + + +class HttpTransactionCache(object): + + def __init__(self): + self.transactions = { + # $txn_key: ObservableDeferred<(res_code, res_json_body)> + } + + def fetch_or_execute_request(self, request, fn, *args, **kwargs): + """A helper function for fetch_or_execute which extracts + a transaction key from the given request. + + See: + fetch_or_execute + """ + return self.fetch_or_execute( + get_transaction_key(request), fn, *args, **kwargs + ) + + def fetch_or_execute(self, txn_key, fn, *args, **kwargs): + """Fetches the response for this transaction, or executes the given function + to produce a response for this transaction. + + Args: + txn_key (str): A key to ensure idempotency should fetch_or_execute be + called again at a later point in time. + fn (function): A function which returns a tuple of + (response_code, response_dict)d + *args: Arguments to pass to fn. + **kwargs: Keyword arguments to pass to fn. + Returns: + synapse.util.async.ObservableDeferred which resolves to a tuple + of (response_code, response_dict). + """ + try: + return self.transactions[txn_key] + except KeyError: + pass # execute the function instead. + + deferred = fn(*args, **kwargs) + observable = ObservableDeferred(deferred) + self.transactions[txn_key] = observable + return observable diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py index 22c740c30c..07ff5b218c 100644 --- a/synapse/rest/client/v1/base.py +++ b/synapse/rest/client/v1/base.py @@ -18,7 +18,8 @@ from synapse.http.servlet import RestServlet from synapse.api.urls import CLIENT_PREFIX -from .transactions import HttpTransactionCache +from synapse.rest.client.transactions import HttpTransactionCache + import re import logging diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 7af4077721..0622e64380 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -22,7 +22,6 @@ from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias -from synapse.util.async import ObservableDeferred from synapse.events.utils import serialize_event, format_event_for_client_v2 from synapse.http.servlet import ( parse_json_object_from_request, parse_string, parse_integer @@ -56,17 +55,11 @@ class RoomCreateRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred(self.on_POST(request)) - self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred.observe() - defer.returnValue(response) + observable = self.txns.fetch_or_execute_request( + request, self.on_POST, request + ) + res = yield observable.observe() + defer.returnValue(res) @defer.inlineCallbacks def on_POST(self, request): @@ -217,19 +210,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred( - self.on_POST(request, room_id, event_type, txn_id) + observable = self.txns.fetch_or_execute_request( + request, self.on_POST, request, room_id, event_type, txn_id ) - self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred.observe() - defer.returnValue(response) + res = yield observable.observe() + defer.returnValue(res) # TODO: Needs unit testing for room ID + alias joins @@ -288,17 +273,11 @@ class JoinRoomAliasServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_identifier, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred(self.on_POST(request, room_identifier, txn_id)) - self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred.observe() - defer.returnValue(response) + observable = self.txns.fetch_or_execute_request( + request, self.on_POST, request, room_identifier, txn_id + ) + res = yield observable.observe() + defer.returnValue(res) # TODO: Needs unit testing @@ -542,17 +521,11 @@ class RoomForgetRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred(self.on_POST(request, room_id, txn_id)) - self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred.observe() - defer.returnValue(response) + observable = self.txns.fetch_or_execute_request( + request, self.on_POST, request, room_id, txn_id + ) + res = yield observable.observe() + defer.returnValue(res) # TODO: Needs unit testing @@ -626,19 +599,11 @@ class RoomMembershipRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, membership_action, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred( - self.on_POST(request, room_id, membership_action, txn_id) + observable = self.txns.fetch_or_execute_request( + request, self.on_POST, request, room_id, membership_action, txn_id ) - self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred.observe() - defer.returnValue(response) + res = yield observable.observe() + defer.returnValue(res) class RoomRedactEventRestServlet(ClientV1RestServlet): @@ -672,19 +637,11 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_PUT(self, request, room_id, event_id, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred( - self.on_POST(request, room_id, event_id, txn_id) + observable = self.txns.fetch_or_execute_request( + request, self.on_POST, request, room_id, event_id, txn_id ) - self.txns.store_client_transaction(request, txn_id, res_deferred) - response = yield res_deferred.observe() - defer.returnValue(response) + res = yield observable.observe() + defer.returnValue(res) class RoomTypingRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py deleted file mode 100644 index 774430458a..0000000000 --- a/synapse/rest/client/v1/transactions.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains logic for storing HTTP PUT transactions. This is used -to ensure idempotency when performing PUTs using the REST API.""" -import logging - -from synapse.api.auth import get_access_token_from_request - -logger = logging.getLogger(__name__) - - -class HttpTransactionCache(object): - - def __init__(self): - # { key : (txn_id, res_observ_defer) } - self.transactions = {} - - def _get_response(self, key, txn_id): - try: - (last_txn_id, res_observ_defer) = self.transactions[key] - if txn_id == last_txn_id: - logger.info("get_response: Returning a response for %s", txn_id) - return res_observ_defer - except KeyError: - pass - return None - - def _store_response(self, key, txn_id, res_observ_defer): - self.transactions[key] = (txn_id, res_observ_defer) - - def store_client_transaction(self, request, txn_id, res_observ_defer): - """Stores the request/Promise pair of an HTTP transaction. - - Args: - request (twisted.web.http.Request): The twisted HTTP request. This - request must have the transaction ID as the last path segment. - res_observ_defer (Promise): A tuple of (response code, response dict) - txn_id (str): The transaction ID for this request. - """ - self._store_response(self._get_key(request), txn_id, res_observ_defer) - - def get_client_transaction(self, request, txn_id): - """Retrieves a stored response if there was one. - - Args: - request (twisted.web.http.Request): The twisted HTTP request. This - request must have the transaction ID as the last path segment. - txn_id (str): The transaction ID for this request. - Returns: - Promise: Resolves to the response tuple. - Raises: - KeyError if the transaction was not found. - """ - res_observ_defer = self._get_response(self._get_key(request), txn_id) - if res_observ_defer is None: - raise KeyError("Transaction not found.") - return res_observ_defer - - def _get_key(self, request): - token = get_access_token_from_request(request) - path_without_txn_id = request.path.rsplit("/", 1)[0] - return path_without_txn_id + "/" + token diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 31167ba535..2ce038c6cd 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,8 +19,7 @@ from twisted.internet import defer from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request -from synapse.rest.client.v1.transactions import HttpTransactionCache -from synapse.util.async import ObservableDeferred +from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_v2_patterns @@ -46,16 +45,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): @defer.inlineCallbacks def on_PUT(self, request, message_type, txn_id): - try: - res_deferred = self.txns.get_client_transaction(request, txn_id) - res = yield res_deferred.observe() - defer.returnValue(res) - except KeyError: - pass - - res_deferred = ObservableDeferred(self._put(request, message_type, txn_id)) - self.txns.store_client_transaction(request, txn_id, res_deferred) - res = yield res_deferred.observe() + observable = self.txns.fetch_or_execute_request( + request, self._put, request, message_type, txn_id + ) + res = yield observable.observe() defer.returnValue(res) @defer.inlineCallbacks From 1c93cd9f9fc393b096790adf23336326dd6737cc Mon Sep 17 00:00:00 2001 From: Daniel Dent Date: Sat, 12 Nov 2016 00:10:23 -0800 Subject: [PATCH 27/33] Add support for durations in minutes --- synapse/config/_base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index af9f17bf7b..1ab5593c6e 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -64,11 +64,12 @@ class Config(object): if isinstance(value, int) or isinstance(value, long): return value second = 1000 - hour = 60 * 60 * second + minute = 60 * second + hour = 60 * minute day = 24 * hour week = 7 * day year = 365 * day - sizes = {"s": second, "h": hour, "d": day, "w": week, "y": year} + sizes = {"s": second, "m": minute, "h": hour, "d": day, "w": week, "y": year} size = 1 suffix = value[-1] if suffix in sizes: From af4a1bac5088e8083f55eea05f6fad44208a3a51 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 14 Nov 2016 09:52:41 +0000 Subject: [PATCH 28/33] Move .observe() up to the cache to make things neater --- synapse/rest/client/transactions.py | 9 +++--- synapse/rest/client/v1/room.py | 30 ++++---------------- synapse/rest/client/v2_alpha/sendtodevice.py | 5 +--- 3 files changed, 11 insertions(+), 33 deletions(-) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 1db972a378..8d69e12d36 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -67,19 +67,18 @@ class HttpTransactionCache(object): txn_key (str): A key to ensure idempotency should fetch_or_execute be called again at a later point in time. fn (function): A function which returns a tuple of - (response_code, response_dict)d + (response_code, response_dict). *args: Arguments to pass to fn. **kwargs: Keyword arguments to pass to fn. Returns: - synapse.util.async.ObservableDeferred which resolves to a tuple - of (response_code, response_dict). + Deferred which resolves to a tuple of (response_code, response_dict). """ try: - return self.transactions[txn_key] + return self.transactions[txn_key].observe() except KeyError: pass # execute the function instead. deferred = fn(*args, **kwargs) observable = ObservableDeferred(deferred) self.transactions[txn_key] = observable - return observable + return observable.observe() diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0622e64380..3fb1f2deb3 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -53,13 +53,10 @@ class RoomCreateRestServlet(ClientV1RestServlet): client_path_patterns("/createRoom(?:/.*)?$"), self.on_OPTIONS) - @defer.inlineCallbacks def on_PUT(self, request, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self.on_POST, request ) - res = yield observable.observe() - defer.returnValue(res) @defer.inlineCallbacks def on_POST(self, request): @@ -208,13 +205,10 @@ class RoomSendEventRestServlet(ClientV1RestServlet): def on_GET(self, request, room_id, event_type, txn_id): return (200, "Not implemented") - @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, event_type, txn_id ) - res = yield observable.observe() - defer.returnValue(res) # TODO: Needs unit testing for room ID + alias joins @@ -271,13 +265,10 @@ class JoinRoomAliasServlet(ClientV1RestServlet): defer.returnValue((200, {"room_id": room_id})) - @defer.inlineCallbacks def on_PUT(self, request, room_identifier, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_identifier, txn_id ) - res = yield observable.observe() - defer.returnValue(res) # TODO: Needs unit testing @@ -519,13 +510,10 @@ class RoomForgetRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) - @defer.inlineCallbacks def on_PUT(self, request, room_id, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, txn_id ) - res = yield observable.observe() - defer.returnValue(res) # TODO: Needs unit testing @@ -597,13 +585,10 @@ class RoomMembershipRestServlet(ClientV1RestServlet): return False return True - @defer.inlineCallbacks def on_PUT(self, request, room_id, membership_action, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, membership_action, txn_id ) - res = yield observable.observe() - defer.returnValue(res) class RoomRedactEventRestServlet(ClientV1RestServlet): @@ -635,13 +620,10 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): defer.returnValue((200, {"event_id": event.event_id})) - @defer.inlineCallbacks def on_PUT(self, request, room_id, event_id, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, event_id, txn_id ) - res = yield observable.observe() - defer.returnValue(res) class RoomTypingRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 2ce038c6cd..2187350d42 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -43,13 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.txns = HttpTransactionCache() self.device_message_handler = hs.get_device_message_handler() - @defer.inlineCallbacks def on_PUT(self, request, message_type, txn_id): - observable = self.txns.fetch_or_execute_request( + return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) - res = yield observable.observe() - defer.returnValue(res) @defer.inlineCallbacks def _put(self, request, message_type, txn_id): From 3991b4cbdb5f5fbdf61ad6efa879b3881143c214 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 14 Nov 2016 11:19:24 +0000 Subject: [PATCH 29/33] Clean transactions based on time. Add HttpTransactionCache tests. --- synapse/rest/client/transactions.py | 24 +++++-- synapse/rest/client/v1/base.py | 2 +- synapse/rest/client/v2_alpha/sendtodevice.py | 2 +- synapse/util/__init__.py | 10 ++- tests/rest/client/test_transactions.py | 69 ++++++++++++++++++++ 5 files changed, 99 insertions(+), 8 deletions(-) create mode 100644 tests/rest/client/test_transactions.py diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 8d69e12d36..351170edbc 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -41,12 +41,19 @@ def get_transaction_key(request): return request.path + "/" + token +CLEANUP_PERIOD_MS = 1000 * 60 * 30 # 30 mins + + class HttpTransactionCache(object): - def __init__(self): + def __init__(self, clock): + self.clock = clock self.transactions = { - # $txn_key: ObservableDeferred<(res_code, res_json_body)> + # $txn_key: (ObservableDeferred<(res_code, res_json_body)>, timestamp) } + # Try to clean entries every 30 mins. This means entries will exist + # for at *LEAST* 30 mins, and at *MOST* 60 mins. + self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS) def fetch_or_execute_request(self, request, fn, *args, **kwargs): """A helper function for fetch_or_execute which extracts @@ -74,11 +81,18 @@ class HttpTransactionCache(object): Deferred which resolves to a tuple of (response_code, response_dict). """ try: - return self.transactions[txn_key].observe() - except KeyError: + return self.transactions[txn_key][0].observe() + except (KeyError, IndexError): pass # execute the function instead. deferred = fn(*args, **kwargs) observable = ObservableDeferred(deferred) - self.transactions[txn_key] = observable + self.transactions[txn_key] = (observable, self.clock.time_msec()) return observable.observe() + + def _cleanup(self): + now = self.clock.time_msec() + for key in self.transactions.keys(): + ts = self.transactions[key][1] + if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period + del self.transactions[key] diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py index 07ff5b218c..c7aa0bbf59 100644 --- a/synapse/rest/client/v1/base.py +++ b/synapse/rest/client/v1/base.py @@ -60,4 +60,4 @@ class ClientV1RestServlet(RestServlet): self.hs = hs self.builder_factory = hs.get_event_builder_factory() self.auth = hs.get_v1auth() - self.txns = HttpTransactionCache() + self.txns = HttpTransactionCache(hs.get_clock()) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 2187350d42..ac660669f3 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -40,7 +40,7 @@ class SendToDeviceRestServlet(servlet.RestServlet): super(SendToDeviceRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.txns = HttpTransactionCache() + self.txns = HttpTransactionCache(hs.get_clock()) self.device_message_handler = hs.get_device_message_handler() def on_PUT(self, request, message_type, txn_id): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 2b3f0bef3c..c05b9450be 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -34,7 +34,7 @@ class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. - TODO(paul): Also move the sleep() functionallity into it + TODO(paul): Also move the sleep() functionality into it """ def time(self): @@ -46,6 +46,14 @@ class Clock(object): return int(self.time() * 1000) def looping_call(self, f, msec): + """Call a function repeatedly. + + Waits `msec` initially before calling `f` for the first time. + + Args: + f(function): The function to call repeatedly. + msec(float): How long to wait between calls in milliseconds. + """ l = task.LoopingCall(f) l.start(msec / 1000.0, now=False) return l diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py new file mode 100644 index 0000000000..d7cea30260 --- /dev/null +++ b/tests/rest/client/test_transactions.py @@ -0,0 +1,69 @@ +from synapse.rest.client.transactions import HttpTransactionCache +from synapse.rest.client.transactions import CLEANUP_PERIOD_MS +from twisted.internet import defer +from mock import Mock, call +from tests import unittest +from tests.utils import MockClock + + +class HttpTransactionCacheTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.cache = HttpTransactionCache(self.clock) + + self.mock_http_response = (200, "GOOD JOB!") + self.mock_key = "foo" + + @defer.inlineCallbacks + def test_executes_given_function(self): + cb = Mock( + return_value=defer.succeed(self.mock_http_response) + ) + res = yield self.cache.fetch_or_execute( + self.mock_key, cb, "some_arg", keyword="arg" + ) + cb.assert_called_once_with("some_arg", keyword="arg") + self.assertEqual(res, self.mock_http_response) + + @defer.inlineCallbacks + def test_deduplicates_based_on_key(self): + cb = Mock( + return_value=defer.succeed(self.mock_http_response) + ) + for i in range(3): # invoke multiple times + res = yield self.cache.fetch_or_execute( + self.mock_key, cb, "some_arg", keyword="arg", changing_args=i + ) + self.assertEqual(res, self.mock_http_response) + # expect only a single call to do the work + cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0) + + @defer.inlineCallbacks + def test_cleans_up(self): + cb = Mock( + return_value=defer.succeed(self.mock_http_response) + ) + yield self.cache.fetch_or_execute( + self.mock_key, cb, "an arg" + ) + # should NOT have cleaned up yet + self.clock.advance_time_msec(CLEANUP_PERIOD_MS / 2) + + yield self.cache.fetch_or_execute( + self.mock_key, cb, "an arg" + ) + # still using cache + cb.assert_called_once_with("an arg") + + self.clock.advance_time_msec(CLEANUP_PERIOD_MS) + + yield self.cache.fetch_or_execute( + self.mock_key, cb, "an arg" + ) + # no longer using cache + self.assertEqual(cb.call_count, 2) + self.assertEqual( + cb.call_args_list, + [call("an arg",), call("an arg",)] + ) From 9d58ccc547233f5bf6a201fb1cf19dcb3cc2c7e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 14 Nov 2016 15:05:04 +0000 Subject: [PATCH 30/33] Bump changelog and version --- CHANGES.rst | 20 ++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 1ce58632b8..f2e7adb25f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,23 @@ +Changes in synapse v0.18.4-rc1 (2016-11-14) +=========================================== + +Changes: + +* Various database efficiency improvements (PR #1188, #1192) +* Update default config to blacklist more internal IPs, thanks to Euan Kemp (PR + #1198) +* Allow specifying duration in minutes in config, thanks to Daniel Dent (PR + #1625) + + +Bug fixes: + +* Fix media repo to set CORs headers on responses (PR #1190) +* Fix registration to not error on non-ascii passwords (PR #1191) +* Fix create event code to limit the number of prev_events (PR #1615) +* Fix bug in transaction ID deduplication (PR #1624) + + Changes in synapse v0.18.3 (2016-11-08) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index d366b69dab..432567a110 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.18.3" +__version__ = "0.18.4" From 544722bad23fc31056b9240189c3cbbbf0ffd3f9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 18 Nov 2016 17:07:35 +0000 Subject: [PATCH 31/33] Work around client replacing reg params Works around https://github.com/vector-im/vector-android/issues/715 and equivalent for iOS --- synapse/rest/client/v2_alpha/register.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 68d18a9b82..b20d9a1095 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -169,6 +169,18 @@ class RegisterRestServlet(RestServlet): guest_access_token = body.get("guest_access_token", None) + if ( + 'initial_device_display_name' in body and + 'password' not in body + ): + # ignore 'initial_device_display_name' if sent without + # a password to work around a client bug where it sent + # the 'initial_device_display_name' param alone, wiping out + # the original registration params + logger.warn("Ignoring initial_device_display_name without password") + del body['initial_device_display_name'] + + session_id = self.auth_handler.get_session_id(body) registered_user_id = None if session_id: From a2891509439d2ec6d12bcc348293e3e9162cd0df Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 18 Nov 2016 17:15:02 +0000 Subject: [PATCH 32/33] Fix flake8 --- synapse/rest/client/v2_alpha/register.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b20d9a1095..6cfb20866b 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -180,7 +180,6 @@ class RegisterRestServlet(RestServlet): logger.warn("Ignoring initial_device_display_name without password") del body['initial_device_display_name'] - session_id = self.auth_handler.get_session_id(body) registered_user_id = None if session_id: From aac06e8f74bafb061090a69cb134fbf9404b5eed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Nov 2016 10:24:04 +0000 Subject: [PATCH 33/33] Bump changelog --- CHANGES.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index f2e7adb25f..a1a0624674 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,11 @@ +Changes in synapse v0.18.4 (2016-11-22) +======================================= + +Bug fixes: + +* Add workaround for buggy clients that the fail to register (PR #1632) + + Changes in synapse v0.18.4-rc1 (2016-11-14) ===========================================