Merge pull request #967 from matrix-org/erikj/fed_reader
Split out the federation reading portions into a separate.
This commit is contained in:
commit
342e072024
|
@ -69,8 +69,8 @@ cd sytest
|
||||||
|
|
||||||
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
||||||
|
|
||||||
: ${PORT_BASE:=8000}
|
: ${PORT_BASE:=20000}
|
||||||
: ${PORT_COUNT=20}
|
: ${PORT_COUNT=100}
|
||||||
|
|
||||||
./jenkins/prep_sytest_for_postgres.sh
|
./jenkins/prep_sytest_for_postgres.sh
|
||||||
|
|
||||||
|
@ -82,6 +82,7 @@ echo >&2 "Running sytest with PostgreSQL";
|
||||||
--dendron $WORKSPACE/dendron/bin/dendron \
|
--dendron $WORKSPACE/dendron/bin/dendron \
|
||||||
--pusher \
|
--pusher \
|
||||||
--synchrotron \
|
--synchrotron \
|
||||||
|
--federation-reader \
|
||||||
--port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1))
|
--port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1))
|
||||||
|
|
||||||
cd ..
|
cd ..
|
||||||
|
|
|
@ -43,8 +43,8 @@ cd sytest
|
||||||
|
|
||||||
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
||||||
|
|
||||||
: ${PORT_BASE:=8000}
|
: ${PORT_BASE:=20000}
|
||||||
: ${PORT_COUNT=20}
|
: ${PORT_COUNT=100}
|
||||||
|
|
||||||
./jenkins/prep_sytest_for_postgres.sh
|
./jenkins/prep_sytest_for_postgres.sh
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,9 @@ cd sytest
|
||||||
|
|
||||||
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
||||||
|
|
||||||
: ${PORT_COUNT=20}
|
: ${PORT_BASE:=20000}
|
||||||
: ${PORT_BASE:=8000}
|
: ${PORT_COUNT=100}
|
||||||
|
|
||||||
./jenkins/install_and_run.sh --coverage \
|
./jenkins/install_and_run.sh --coverage \
|
||||||
--python $TOX_BIN/python \
|
--python $TOX_BIN/python \
|
||||||
--synapse-directory $WORKSPACE \
|
--synapse-directory $WORKSPACE \
|
||||||
|
|
|
@ -0,0 +1,206 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import synapse
|
||||||
|
|
||||||
|
from synapse.config._base import ConfigError
|
||||||
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
|
from synapse.config.logger import setup_logging
|
||||||
|
from synapse.http.site import SynapseSite
|
||||||
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
|
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||||
|
from synapse.replication.slave.storage.room import RoomStore
|
||||||
|
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||||
|
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
from synapse.storage.engines import create_engine
|
||||||
|
from synapse.util.async import sleep
|
||||||
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
from synapse.util.manhole import manhole
|
||||||
|
from synapse.util.rlimit import change_resource_limit
|
||||||
|
from synapse.util.versionstring import get_version_string
|
||||||
|
from synapse.api.urls import FEDERATION_PREFIX
|
||||||
|
from synapse.federation.transport.server import TransportLayerServer
|
||||||
|
from synapse.crypto import context_factory
|
||||||
|
|
||||||
|
|
||||||
|
from twisted.internet import reactor, defer
|
||||||
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
|
from daemonize import Daemonize
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
import gc
|
||||||
|
|
||||||
|
logger = logging.getLogger("synapse.app.federation_reader")
|
||||||
|
|
||||||
|
|
||||||
|
class FederationReaderSlavedStore(
|
||||||
|
SlavedEventStore,
|
||||||
|
SlavedKeyStore,
|
||||||
|
RoomStore,
|
||||||
|
DirectoryStore,
|
||||||
|
TransactionStore,
|
||||||
|
BaseSlavedStore,
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FederationReaderServer(HomeServer):
|
||||||
|
def get_db_conn(self, run_new_connection=True):
|
||||||
|
# Any param beginning with cp_ is a parameter for adbapi, and should
|
||||||
|
# not be passed to the database engine.
|
||||||
|
db_params = {
|
||||||
|
k: v for k, v in self.db_config.get("args", {}).items()
|
||||||
|
if not k.startswith("cp_")
|
||||||
|
}
|
||||||
|
db_conn = self.database_engine.module.connect(**db_params)
|
||||||
|
|
||||||
|
if run_new_connection:
|
||||||
|
self.database_engine.on_new_connection(db_conn)
|
||||||
|
return db_conn
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
logger.info("Setting up.")
|
||||||
|
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
|
||||||
|
logger.info("Finished setting up.")
|
||||||
|
|
||||||
|
def _listen_http(self, listener_config):
|
||||||
|
port = listener_config["port"]
|
||||||
|
bind_address = listener_config.get("bind_address", "")
|
||||||
|
site_tag = listener_config.get("tag", port)
|
||||||
|
resources = {}
|
||||||
|
for res in listener_config["resources"]:
|
||||||
|
for name in res["names"]:
|
||||||
|
if name == "metrics":
|
||||||
|
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||||
|
elif name == "federation":
|
||||||
|
resources.update({
|
||||||
|
FEDERATION_PREFIX: TransportLayerServer(self),
|
||||||
|
})
|
||||||
|
|
||||||
|
root_resource = create_resource_tree(resources, Resource())
|
||||||
|
reactor.listenTCP(
|
||||||
|
port,
|
||||||
|
SynapseSite(
|
||||||
|
"synapse.access.http.%s" % (site_tag,),
|
||||||
|
site_tag,
|
||||||
|
listener_config,
|
||||||
|
root_resource,
|
||||||
|
),
|
||||||
|
interface=bind_address
|
||||||
|
)
|
||||||
|
logger.info("Synapse federation reader now listening on port %d", port)
|
||||||
|
|
||||||
|
def start_listening(self, listeners):
|
||||||
|
for listener in listeners:
|
||||||
|
if listener["type"] == "http":
|
||||||
|
self._listen_http(listener)
|
||||||
|
elif listener["type"] == "manhole":
|
||||||
|
reactor.listenTCP(
|
||||||
|
listener["port"],
|
||||||
|
manhole(
|
||||||
|
username="matrix",
|
||||||
|
password="rabbithole",
|
||||||
|
globals={"hs": self},
|
||||||
|
),
|
||||||
|
interface=listener.get("bind_address", '127.0.0.1')
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def replicate(self):
|
||||||
|
http_client = self.get_simple_http_client()
|
||||||
|
store = self.get_datastore()
|
||||||
|
replication_url = self.config.worker_replication_url
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
args = store.stream_positions()
|
||||||
|
args["timeout"] = 30000
|
||||||
|
result = yield http_client.get_json(replication_url, args=args)
|
||||||
|
yield store.process_replication(result)
|
||||||
|
except:
|
||||||
|
logger.exception("Error replicating from %r", replication_url)
|
||||||
|
yield sleep(5)
|
||||||
|
|
||||||
|
|
||||||
|
def start(config_options):
|
||||||
|
try:
|
||||||
|
config = HomeServerConfig.load_config(
|
||||||
|
"Synapse federation reader", config_options
|
||||||
|
)
|
||||||
|
except ConfigError as e:
|
||||||
|
sys.stderr.write("\n" + e.message + "\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
assert config.worker_app == "synapse.app.federation_reader"
|
||||||
|
|
||||||
|
setup_logging(config.worker_log_config, config.worker_log_file)
|
||||||
|
|
||||||
|
database_engine = create_engine(config.database_config)
|
||||||
|
|
||||||
|
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||||
|
|
||||||
|
ss = FederationReaderServer(
|
||||||
|
config.server_name,
|
||||||
|
db_config=config.database_config,
|
||||||
|
tls_server_context_factory=tls_server_context_factory,
|
||||||
|
config=config,
|
||||||
|
version_string=get_version_string("Synapse", synapse),
|
||||||
|
database_engine=database_engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
ss.setup()
|
||||||
|
ss.get_handlers()
|
||||||
|
ss.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
|
def run():
|
||||||
|
with LoggingContext("run"):
|
||||||
|
logger.info("Running")
|
||||||
|
change_resource_limit(config.soft_file_limit)
|
||||||
|
if config.gc_thresholds:
|
||||||
|
gc.set_threshold(*config.gc_thresholds)
|
||||||
|
reactor.run()
|
||||||
|
|
||||||
|
def start():
|
||||||
|
ss.get_datastore().start_profiling()
|
||||||
|
ss.replicate()
|
||||||
|
|
||||||
|
reactor.callWhenRunning(start)
|
||||||
|
|
||||||
|
if config.worker_daemonize:
|
||||||
|
daemon = Daemonize(
|
||||||
|
app="synapse-federation-reader",
|
||||||
|
pid=config.worker_pid_file,
|
||||||
|
action=run,
|
||||||
|
auto_close_fds=False,
|
||||||
|
verbose=True,
|
||||||
|
logger=logger,
|
||||||
|
)
|
||||||
|
daemon.start()
|
||||||
|
else:
|
||||||
|
run()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
with LoggingContext("main"):
|
||||||
|
start(sys.argv[1:])
|
|
@ -0,0 +1,23 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015, 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import BaseSlavedStore
|
||||||
|
from synapse.storage.directory import DirectoryStore
|
||||||
|
|
||||||
|
|
||||||
|
class DirectoryStore(BaseSlavedStore):
|
||||||
|
get_aliases_for_room = DirectoryStore.__dict__[
|
||||||
|
"get_aliases_for_room"
|
||||||
|
].orig
|
|
@ -145,6 +145,15 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
||||||
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
|
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
|
||||||
|
|
||||||
|
get_backfill_events = DataStore.get_backfill_events.__func__
|
||||||
|
_get_backfill_events = DataStore._get_backfill_events.__func__
|
||||||
|
get_missing_events = DataStore.get_missing_events.__func__
|
||||||
|
_get_missing_events = DataStore._get_missing_events.__func__
|
||||||
|
|
||||||
|
get_auth_chain = DataStore.get_auth_chain.__func__
|
||||||
|
get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
|
||||||
|
_get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedEventStore, self).stream_positions()
|
result = super(SlavedEventStore, self).stream_positions()
|
||||||
result["events"] = self._stream_id_gen.get_current_token()
|
result["events"] = self._stream_id_gen.get_current_token()
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015, 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import BaseSlavedStore
|
||||||
|
from synapse.storage import DataStore
|
||||||
|
from synapse.storage.keys import KeyStore
|
||||||
|
|
||||||
|
|
||||||
|
class SlavedKeyStore(BaseSlavedStore):
|
||||||
|
_get_server_verify_key = KeyStore.__dict__[
|
||||||
|
"_get_server_verify_key"
|
||||||
|
]
|
||||||
|
|
||||||
|
get_server_verify_keys = DataStore.get_server_verify_keys.__func__
|
||||||
|
store_server_verify_key = DataStore.store_server_verify_key.__func__
|
||||||
|
|
||||||
|
get_server_certificate = DataStore.get_server_certificate.__func__
|
||||||
|
store_server_certificate = DataStore.store_server_certificate.__func__
|
||||||
|
|
||||||
|
get_server_keys_json = DataStore.get_server_keys_json.__func__
|
||||||
|
store_server_keys_json = DataStore.store_server_keys_json.__func__
|
|
@ -0,0 +1,21 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015, 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import BaseSlavedStore
|
||||||
|
from synapse.storage import DataStore
|
||||||
|
|
||||||
|
|
||||||
|
class RoomStore(BaseSlavedStore):
|
||||||
|
get_public_room_ids = DataStore.get_public_room_ids.__func__
|
|
@ -0,0 +1,30 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015, 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
from ._base import BaseSlavedStore
|
||||||
|
from synapse.storage import DataStore
|
||||||
|
from synapse.storage.transactions import TransactionStore
|
||||||
|
|
||||||
|
|
||||||
|
class TransactionStore(BaseSlavedStore):
|
||||||
|
get_destination_retry_timings = TransactionStore.__dict__[
|
||||||
|
"get_destination_retry_timings"
|
||||||
|
].orig
|
||||||
|
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
|
||||||
|
|
||||||
|
# For now, don't record the destination rety timings
|
||||||
|
def set_destination_retry_timings(*args, **kwargs):
|
||||||
|
return defer.succeed(None)
|
|
@ -22,6 +22,10 @@ import OpenSSL
|
||||||
from signedjson.key import decode_verify_key_bytes
|
from signedjson.key import decode_verify_key_bytes
|
||||||
import hashlib
|
import hashlib
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class KeyStore(SQLBaseStore):
|
class KeyStore(SQLBaseStore):
|
||||||
"""Persistence for signature verification keys and tls X.509 certificates
|
"""Persistence for signature verification keys and tls X.509 certificates
|
||||||
|
@ -74,22 +78,22 @@ class KeyStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
@cachedInlineCallbacks()
|
@cachedInlineCallbacks()
|
||||||
def get_all_server_verify_keys(self, server_name):
|
def _get_server_verify_key(self, server_name, key_id):
|
||||||
rows = yield self._simple_select_list(
|
verify_key_bytes = yield self._simple_select_one_onecol(
|
||||||
table="server_signature_keys",
|
table="server_signature_keys",
|
||||||
keyvalues={
|
keyvalues={
|
||||||
"server_name": server_name,
|
"server_name": server_name,
|
||||||
|
"key_id": key_id,
|
||||||
},
|
},
|
||||||
retcols=["key_id", "verify_key"],
|
retcol="verify_key",
|
||||||
desc="get_all_server_verify_keys",
|
desc="_get_server_verify_key",
|
||||||
|
allow_none=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue({
|
if verify_key_bytes:
|
||||||
row["key_id"]: decode_verify_key_bytes(
|
defer.returnValue(decode_verify_key_bytes(
|
||||||
row["key_id"], str(row["verify_key"])
|
key_id, str(verify_key_bytes)
|
||||||
)
|
))
|
||||||
for row in rows
|
|
||||||
})
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_server_verify_keys(self, server_name, key_ids):
|
def get_server_verify_keys(self, server_name, key_ids):
|
||||||
|
@ -101,12 +105,12 @@ class KeyStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
(list of VerifyKey): The verification keys.
|
(list of VerifyKey): The verification keys.
|
||||||
"""
|
"""
|
||||||
keys = yield self.get_all_server_verify_keys(server_name)
|
keys = {}
|
||||||
defer.returnValue({
|
for key_id in key_ids:
|
||||||
k: keys[k]
|
key = yield self._get_server_verify_key(server_name, key_id)
|
||||||
for k in key_ids
|
if key:
|
||||||
if k in keys and keys[k]
|
keys[key_id] = key
|
||||||
})
|
defer.returnValue(keys)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def store_server_verify_key(self, server_name, from_server, time_now_ms,
|
def store_server_verify_key(self, server_name, from_server, time_now_ms,
|
||||||
|
@ -133,8 +137,6 @@ class KeyStore(SQLBaseStore):
|
||||||
desc="store_server_verify_key",
|
desc="store_server_verify_key",
|
||||||
)
|
)
|
||||||
|
|
||||||
self.get_all_server_verify_keys.invalidate((server_name,))
|
|
||||||
|
|
||||||
def store_server_keys_json(self, server_name, key_id, from_server,
|
def store_server_keys_json(self, server_name, key_id, from_server,
|
||||||
ts_now_ms, ts_expires_ms, key_json_bytes):
|
ts_now_ms, ts_expires_ms, key_json_bytes):
|
||||||
"""Stores the JSON bytes for a set of keys from a server
|
"""Stores the JSON bytes for a set of keys from a server
|
||||||
|
|
Loading…
Reference in New Issue