Hook up the send queue and create a federation sender worker
This commit is contained in:
parent
1587b5a033
commit
ed787cf09e
|
@ -0,0 +1,302 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import synapse
|
||||||
|
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
from synapse.config._base import ConfigError
|
||||||
|
from synapse.config.logger import setup_logging
|
||||||
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
|
from synapse.crypto import context_factory
|
||||||
|
from synapse.http.site import SynapseSite
|
||||||
|
from synapse.federation import send_queue
|
||||||
|
from synapse.federation.units import Edu
|
||||||
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
|
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||||
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
|
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||||
|
from synapse.storage.engines import create_engine
|
||||||
|
from synapse.storage.presence import UserPresenceState
|
||||||
|
from synapse.util.async import sleep
|
||||||
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
from synapse.util.manhole import manhole
|
||||||
|
from synapse.util.rlimit import change_resource_limit
|
||||||
|
from synapse.util.versionstring import get_version_string
|
||||||
|
|
||||||
|
from synapse import events
|
||||||
|
|
||||||
|
from twisted.internet import reactor, defer
|
||||||
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
|
from daemonize import Daemonize
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
import gc
|
||||||
|
import ujson as json
|
||||||
|
|
||||||
|
logger = logging.getLogger("synapse.app.appservice")
|
||||||
|
|
||||||
|
|
||||||
|
class FederationSenderSlaveStore(
|
||||||
|
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
|
||||||
|
SlavedRegistrationStore,
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FederationSenderServer(HomeServer):
|
||||||
|
def get_db_conn(self, run_new_connection=True):
|
||||||
|
# Any param beginning with cp_ is a parameter for adbapi, and should
|
||||||
|
# not be passed to the database engine.
|
||||||
|
db_params = {
|
||||||
|
k: v for k, v in self.db_config.get("args", {}).items()
|
||||||
|
if not k.startswith("cp_")
|
||||||
|
}
|
||||||
|
db_conn = self.database_engine.module.connect(**db_params)
|
||||||
|
|
||||||
|
if run_new_connection:
|
||||||
|
self.database_engine.on_new_connection(db_conn)
|
||||||
|
return db_conn
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
logger.info("Setting up.")
|
||||||
|
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
|
||||||
|
logger.info("Finished setting up.")
|
||||||
|
|
||||||
|
def _listen_http(self, listener_config):
|
||||||
|
port = listener_config["port"]
|
||||||
|
bind_address = listener_config.get("bind_address", "")
|
||||||
|
site_tag = listener_config.get("tag", port)
|
||||||
|
resources = {}
|
||||||
|
for res in listener_config["resources"]:
|
||||||
|
for name in res["names"]:
|
||||||
|
if name == "metrics":
|
||||||
|
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||||
|
|
||||||
|
root_resource = create_resource_tree(resources, Resource())
|
||||||
|
reactor.listenTCP(
|
||||||
|
port,
|
||||||
|
SynapseSite(
|
||||||
|
"synapse.access.http.%s" % (site_tag,),
|
||||||
|
site_tag,
|
||||||
|
listener_config,
|
||||||
|
root_resource,
|
||||||
|
),
|
||||||
|
interface=bind_address
|
||||||
|
)
|
||||||
|
logger.info("Synapse federation_sender now listening on port %d", port)
|
||||||
|
|
||||||
|
def start_listening(self, listeners):
|
||||||
|
for listener in listeners:
|
||||||
|
if listener["type"] == "http":
|
||||||
|
self._listen_http(listener)
|
||||||
|
elif listener["type"] == "manhole":
|
||||||
|
reactor.listenTCP(
|
||||||
|
listener["port"],
|
||||||
|
manhole(
|
||||||
|
username="matrix",
|
||||||
|
password="rabbithole",
|
||||||
|
globals={"hs": self},
|
||||||
|
),
|
||||||
|
interface=listener.get("bind_address", '127.0.0.1')
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def replicate(self):
|
||||||
|
http_client = self.get_simple_http_client()
|
||||||
|
store = self.get_datastore()
|
||||||
|
replication_url = self.config.worker_replication_url
|
||||||
|
send_handler = self._get_send_handler()
|
||||||
|
|
||||||
|
def replicate(results):
|
||||||
|
stream = results.get("events")
|
||||||
|
if stream:
|
||||||
|
# max_stream_id = stream["position"]
|
||||||
|
# TODO
|
||||||
|
pass
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
args = store.stream_positions()
|
||||||
|
args.update(send_handler.stream_positions())
|
||||||
|
args["timeout"] = 30000
|
||||||
|
result = yield http_client.get_json(replication_url, args=args)
|
||||||
|
yield store.process_replication(result)
|
||||||
|
send_handler.process_replication(result)
|
||||||
|
replicate(result)
|
||||||
|
except:
|
||||||
|
logger.exception("Error replicating from %r", replication_url)
|
||||||
|
yield sleep(30)
|
||||||
|
|
||||||
|
def _get_send_handler(self):
|
||||||
|
try:
|
||||||
|
return self._send_handler
|
||||||
|
except AttributeError:
|
||||||
|
self._send_handler = FederationSenderHandler(self)
|
||||||
|
return self._send_handler
|
||||||
|
|
||||||
|
|
||||||
|
def start(config_options):
|
||||||
|
try:
|
||||||
|
config = HomeServerConfig.load_config(
|
||||||
|
"Synapse federation sender", config_options
|
||||||
|
)
|
||||||
|
except ConfigError as e:
|
||||||
|
sys.stderr.write("\n" + e.message + "\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
assert config.worker_app == "synapse.app.federation_sender"
|
||||||
|
|
||||||
|
setup_logging(config.worker_log_config, config.worker_log_file)
|
||||||
|
|
||||||
|
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||||
|
|
||||||
|
database_engine = create_engine(config.database_config)
|
||||||
|
|
||||||
|
if config.send_federation:
|
||||||
|
sys.stderr.write(
|
||||||
|
"\nThe send_federation must be disabled in the main synapse process"
|
||||||
|
"\nbefore they can be run in a separate worker."
|
||||||
|
"\nPlease add ``send_federation: false`` to the main config"
|
||||||
|
"\n"
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Force the pushers to start since they will be disabled in the main config
|
||||||
|
config.send_federation = True
|
||||||
|
|
||||||
|
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||||
|
|
||||||
|
ps = FederationSenderServer(
|
||||||
|
config.server_name,
|
||||||
|
db_config=config.database_config,
|
||||||
|
tls_server_context_factory=tls_server_context_factory,
|
||||||
|
config=config,
|
||||||
|
version_string="Synapse/" + get_version_string(synapse),
|
||||||
|
database_engine=database_engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
ps.setup()
|
||||||
|
ps.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
|
def run():
|
||||||
|
with LoggingContext("run"):
|
||||||
|
logger.info("Running")
|
||||||
|
change_resource_limit(config.soft_file_limit)
|
||||||
|
if config.gc_thresholds:
|
||||||
|
gc.set_threshold(*config.gc_thresholds)
|
||||||
|
reactor.run()
|
||||||
|
|
||||||
|
def start():
|
||||||
|
ps.replicate()
|
||||||
|
ps.get_datastore().start_profiling()
|
||||||
|
ps.get_state_handler().start_caching()
|
||||||
|
|
||||||
|
reactor.callWhenRunning(start)
|
||||||
|
|
||||||
|
if config.worker_daemonize:
|
||||||
|
daemon = Daemonize(
|
||||||
|
app="synapse-federation-sender",
|
||||||
|
pid=config.worker_pid_file,
|
||||||
|
action=run,
|
||||||
|
auto_close_fds=False,
|
||||||
|
verbose=True,
|
||||||
|
logger=logger,
|
||||||
|
)
|
||||||
|
daemon.start()
|
||||||
|
else:
|
||||||
|
run()
|
||||||
|
|
||||||
|
|
||||||
|
class FederationSenderHandler(object):
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.federation_sender = hs.get_federation_sender()
|
||||||
|
|
||||||
|
self._latest_room_serial = -1
|
||||||
|
self._room_serials = {}
|
||||||
|
self._room_typing = {}
|
||||||
|
|
||||||
|
def stream_positions(self):
|
||||||
|
# We must update this token from the response of the previous
|
||||||
|
# sync. In particular, the stream id may "reset" back to zero/a low
|
||||||
|
# value which we *must* use for the next replication request.
|
||||||
|
return {"federation": self._latest_room_serial}
|
||||||
|
|
||||||
|
def process_replication(self, result):
|
||||||
|
stream = result.get("federation")
|
||||||
|
if stream:
|
||||||
|
self._latest_room_serial = int(stream["position"])
|
||||||
|
|
||||||
|
presence_to_send = {}
|
||||||
|
keyed_edus = {}
|
||||||
|
edus = {}
|
||||||
|
failures = {}
|
||||||
|
|
||||||
|
for row in stream["rows"]:
|
||||||
|
position, typ, content_js = row
|
||||||
|
content = json.loads(content_js)
|
||||||
|
|
||||||
|
if typ == send_queue.PRESENCE_TYPE:
|
||||||
|
destination = content["destination"]
|
||||||
|
state = UserPresenceState.from_dict(content["state"])
|
||||||
|
|
||||||
|
presence_to_send.setdefault(destination, []).append(state)
|
||||||
|
elif typ == send_queue.KEYED_EDU_TYPE:
|
||||||
|
key = content["key"]
|
||||||
|
edu = Edu(**content["edu"])
|
||||||
|
|
||||||
|
keyed_edus.setdefault(edu.destination, {})[key] = edu
|
||||||
|
elif typ == send_queue.EDU_TYPE:
|
||||||
|
edu = Edu(**content)
|
||||||
|
|
||||||
|
edus.setdefault(edu.destination, []).append(edu)
|
||||||
|
elif typ == send_queue.FAILURE_TYPE:
|
||||||
|
destination = content["destination"]
|
||||||
|
failure = content["failure"]
|
||||||
|
|
||||||
|
failures.setdefault(destination, []).append(failure)
|
||||||
|
else:
|
||||||
|
raise Exception("Unrecognised federation type: %r", typ)
|
||||||
|
|
||||||
|
for destination, states in presence_to_send.items():
|
||||||
|
self.federation_sender.send_presence(destination, states)
|
||||||
|
|
||||||
|
for destination, edu_map in keyed_edus.items():
|
||||||
|
for key, edu in edu_map.items():
|
||||||
|
self.federation_sender.send_edu(
|
||||||
|
edu.destination, edu.edu_type, edu.content, key=key,
|
||||||
|
)
|
||||||
|
|
||||||
|
for destination, edu_list in edus.items():
|
||||||
|
for edu in edu_list:
|
||||||
|
self.federation_sender.send_edu(
|
||||||
|
edu.destination, edu.edu_type, edu.content, key=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
for destination, failure_list in failures.items():
|
||||||
|
for failure in failure_list:
|
||||||
|
self.federation_sender.send_failure(destination, failure)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
with LoggingContext("main"):
|
||||||
|
start(sys.argv[1:])
|
|
@ -30,6 +30,11 @@ class ServerConfig(Config):
|
||||||
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
|
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
|
||||||
self.public_baseurl = config.get("public_baseurl")
|
self.public_baseurl = config.get("public_baseurl")
|
||||||
|
|
||||||
|
# Whether to send federation traffic out in this process. This only
|
||||||
|
# applies to some federation traffic, and so shouldn't be used to
|
||||||
|
# "disable" federation
|
||||||
|
self.send_federation = config.get("send_federation", True)
|
||||||
|
|
||||||
if self.public_baseurl is not None:
|
if self.public_baseurl is not None:
|
||||||
if self.public_baseurl[-1] != '/':
|
if self.public_baseurl[-1] != '/':
|
||||||
self.public_baseurl += '/'
|
self.public_baseurl += '/'
|
||||||
|
|
|
@ -13,11 +13,20 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from .units import Edu
|
||||||
|
|
||||||
from blist import sorteddict
|
from blist import sorteddict
|
||||||
|
import ujson
|
||||||
|
|
||||||
|
|
||||||
|
PRESENCE_TYPE = "p"
|
||||||
|
KEYED_EDU_TYPE = "k"
|
||||||
|
EDU_TYPE = "e"
|
||||||
|
FAILURE_TYPE = "f"
|
||||||
|
|
||||||
|
|
||||||
class FederationRemoteSendQueue(object):
|
class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
|
@ -68,12 +77,12 @@ class FederationRemoteSendQueue(object):
|
||||||
for key in keys[:i]:
|
for key in keys[:i]:
|
||||||
del self.presence_changed[key]
|
del self.presence_changed[key]
|
||||||
|
|
||||||
user_ids = set()
|
user_ids = set(
|
||||||
for _, states in self.presence_changed.values():
|
user_id for uids in self.presence_changed.values() for _, user_id in uids
|
||||||
user_ids.update(s.user_id for s in user_ids)
|
)
|
||||||
|
|
||||||
to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
|
to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
|
||||||
for user_id in self.to_del:
|
for user_id in to_del:
|
||||||
del self.presence_map[user_id]
|
del self.presence_map[user_id]
|
||||||
|
|
||||||
# Delete things out of keyed edus
|
# Delete things out of keyed edus
|
||||||
|
@ -102,47 +111,77 @@ class FederationRemoteSendQueue(object):
|
||||||
for key in keys[:i]:
|
for key in keys[:i]:
|
||||||
del self.failures[key]
|
del self.failures[key]
|
||||||
|
|
||||||
def send_edu(self, edu, key=None):
|
def send_edu(self, destination, edu_type, content, key=None):
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
|
edu = Edu(
|
||||||
|
origin=self.server_name,
|
||||||
|
destination=destination,
|
||||||
|
edu_type=edu_type,
|
||||||
|
content=content,
|
||||||
|
)
|
||||||
|
|
||||||
if key:
|
if key:
|
||||||
self.keyed_edu[(edu.destination, key)] = edu
|
self.keyed_edu[(destination, key)] = edu
|
||||||
self.keyed_edu_changed[pos] = (edu.destination, key)
|
self.keyed_edu_changed[pos] = (destination, key)
|
||||||
else:
|
else:
|
||||||
self.edus[pos] = edu
|
self.edus[pos] = edu
|
||||||
|
|
||||||
def send_presence(self, destination, states):
|
def send_presence(self, destination, states):
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
self.presence_map.presence_map.update({
|
self.presence_map.update({
|
||||||
state.user_id: state
|
state.user_id: state
|
||||||
for state in states
|
for state in states
|
||||||
})
|
})
|
||||||
|
|
||||||
self.presence_changed[pos] = (destination, [
|
self.presence_changed[pos] = [
|
||||||
state.user_id for state in states
|
(destination, state.user_id) for state in states
|
||||||
])
|
]
|
||||||
|
|
||||||
def send_failure(self, failure, destination):
|
def send_failure(self, failure, destination):
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
self.failures[pos] = (destination, failure)
|
self.failures[pos] = (destination, str(failure))
|
||||||
|
|
||||||
|
def send_pdu(self, pdu, destinations):
|
||||||
|
# This gets sent down a separate path
|
||||||
|
pass
|
||||||
|
|
||||||
def notify_new_device_message(self, destination):
|
def notify_new_device_message(self, destination):
|
||||||
# TODO
|
# TODO
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_replication_rows(self, token):
|
def get_current_token(self):
|
||||||
|
return self.pos - 1
|
||||||
|
|
||||||
|
def get_replication_rows(self, token, limit):
|
||||||
|
# TODO: Handle limit.
|
||||||
|
|
||||||
|
# To handle restarts where we wrap around
|
||||||
|
if token > self.pos:
|
||||||
|
token = -1
|
||||||
|
|
||||||
rows = []
|
rows = []
|
||||||
|
|
||||||
|
# There should be only one reader, so lets delete everything its
|
||||||
|
# acknowledged its seen.
|
||||||
|
self._clear_queue_before_pos(token)
|
||||||
|
|
||||||
# Fetch changed presence
|
# Fetch changed presence
|
||||||
keys = self.presence_changed.keys()
|
keys = self.presence_changed.keys()
|
||||||
i = keys.bisect_right(token)
|
i = keys.bisect_right(token)
|
||||||
dest_user_ids = set((k, self.presence_changed[k]) for k in keys[i:])
|
dest_user_ids = set(
|
||||||
|
(pos, dest_user_id)
|
||||||
|
for pos in keys[i:]
|
||||||
|
for dest_user_id in self.presence_changed[pos]
|
||||||
|
)
|
||||||
|
|
||||||
for (key, (dest, user_ids)) in dest_user_ids:
|
for (key, (dest, user_id)) in dest_user_ids:
|
||||||
for user_id in user_ids:
|
rows.append((key, PRESENCE_TYPE, ujson.dumps({
|
||||||
rows.append((key, dest, "p", self.presence_map[user_id]))
|
"destination": dest,
|
||||||
|
"state": self.presence_map[user_id].as_dict(),
|
||||||
|
})))
|
||||||
|
|
||||||
# Fetch changes keyed edus
|
# Fetch changes keyed edus
|
||||||
keys = self.keyed_edu_changed.keys()
|
keys = self.keyed_edu_changed.keys()
|
||||||
|
@ -150,7 +189,12 @@ class FederationRemoteSendQueue(object):
|
||||||
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
|
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
|
||||||
|
|
||||||
for (pos, edu_key) in keyed_edus:
|
for (pos, edu_key) in keyed_edus:
|
||||||
rows.append((pos, edu_key, "k", self.keyed_edu[edu_key]))
|
rows.append(
|
||||||
|
(pos, KEYED_EDU_TYPE, ujson.dumps({
|
||||||
|
"key": edu_key,
|
||||||
|
"edu": self.keyed_edu[edu_key].get_dict(),
|
||||||
|
}))
|
||||||
|
)
|
||||||
|
|
||||||
# Fetch changed edus
|
# Fetch changed edus
|
||||||
keys = self.edus.keys()
|
keys = self.edus.keys()
|
||||||
|
@ -158,7 +202,7 @@ class FederationRemoteSendQueue(object):
|
||||||
edus = set((k, self.edus[k]) for k in keys[i:])
|
edus = set((k, self.edus[k]) for k in keys[i:])
|
||||||
|
|
||||||
for (pos, edu) in edus:
|
for (pos, edu) in edus:
|
||||||
rows.append((pos, edu.destination, "e", edu))
|
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict())))
|
||||||
|
|
||||||
# Fetch changed failures
|
# Fetch changed failures
|
||||||
keys = self.failures.keys()
|
keys = self.failures.keys()
|
||||||
|
@ -166,7 +210,10 @@ class FederationRemoteSendQueue(object):
|
||||||
failures = set((k, self.failures[k]) for k in keys[i:])
|
failures = set((k, self.failures[k]) for k in keys[i:])
|
||||||
|
|
||||||
for (pos, (destination, failure)) in failures:
|
for (pos, (destination, failure)) in failures:
|
||||||
rows.append((pos, destination, "f", failure))
|
rows.append((pos, None, FAILURE_TYPE, ujson.dumps({
|
||||||
|
"destination": destination,
|
||||||
|
"failure": failure,
|
||||||
|
})))
|
||||||
|
|
||||||
# Sort rows based on pos
|
# Sort rows based on pos
|
||||||
rows.sort()
|
rows.sort()
|
||||||
|
|
|
@ -44,6 +44,7 @@ STREAM_NAMES = (
|
||||||
("caches",),
|
("caches",),
|
||||||
("to_device",),
|
("to_device",),
|
||||||
("public_rooms",),
|
("public_rooms",),
|
||||||
|
("federation",),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -116,8 +117,10 @@ class ReplicationResource(Resource):
|
||||||
self.sources = hs.get_event_sources()
|
self.sources = hs.get_event_sources()
|
||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
self.typing_handler = hs.get_typing_handler()
|
self.typing_handler = hs.get_typing_handler()
|
||||||
|
self.federation_sender = hs.get_federation_sender()
|
||||||
self.notifier = hs.notifier
|
self.notifier = hs.notifier
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
self.config = hs.get_config()
|
||||||
|
|
||||||
self.putChild("remove_pushers", PusherResource(hs))
|
self.putChild("remove_pushers", PusherResource(hs))
|
||||||
self.putChild("syncing_users", PresenceResource(hs))
|
self.putChild("syncing_users", PresenceResource(hs))
|
||||||
|
@ -134,6 +137,7 @@ class ReplicationResource(Resource):
|
||||||
pushers_token = self.store.get_pushers_stream_token()
|
pushers_token = self.store.get_pushers_stream_token()
|
||||||
caches_token = self.store.get_cache_stream_token()
|
caches_token = self.store.get_cache_stream_token()
|
||||||
public_rooms_token = self.store.get_current_public_room_stream_id()
|
public_rooms_token = self.store.get_current_public_room_stream_id()
|
||||||
|
federation_token = self.federation_sender.get_current_token()
|
||||||
|
|
||||||
defer.returnValue(_ReplicationToken(
|
defer.returnValue(_ReplicationToken(
|
||||||
room_stream_token,
|
room_stream_token,
|
||||||
|
@ -148,6 +152,7 @@ class ReplicationResource(Resource):
|
||||||
caches_token,
|
caches_token,
|
||||||
int(stream_token.to_device_key),
|
int(stream_token.to_device_key),
|
||||||
int(public_rooms_token),
|
int(public_rooms_token),
|
||||||
|
int(federation_token),
|
||||||
))
|
))
|
||||||
|
|
||||||
@request_handler()
|
@request_handler()
|
||||||
|
@ -202,6 +207,7 @@ class ReplicationResource(Resource):
|
||||||
yield self.caches(writer, current_token, limit, request_streams)
|
yield self.caches(writer, current_token, limit, request_streams)
|
||||||
yield self.to_device(writer, current_token, limit, request_streams)
|
yield self.to_device(writer, current_token, limit, request_streams)
|
||||||
yield self.public_rooms(writer, current_token, limit, request_streams)
|
yield self.public_rooms(writer, current_token, limit, request_streams)
|
||||||
|
self.federation(writer, current_token, limit, request_streams)
|
||||||
self.streams(writer, current_token, request_streams)
|
self.streams(writer, current_token, request_streams)
|
||||||
|
|
||||||
logger.debug("Replicated %d rows", writer.total)
|
logger.debug("Replicated %d rows", writer.total)
|
||||||
|
@ -465,6 +471,23 @@ class ReplicationResource(Resource):
|
||||||
"position", "room_id", "visibility"
|
"position", "room_id", "visibility"
|
||||||
), position=upto_token)
|
), position=upto_token)
|
||||||
|
|
||||||
|
def federation(self, writer, current_token, limit, request_streams):
|
||||||
|
if self.config.send_federation:
|
||||||
|
return
|
||||||
|
|
||||||
|
current_position = current_token.federation
|
||||||
|
|
||||||
|
federation = request_streams.get("federation")
|
||||||
|
|
||||||
|
if federation is not None and federation != current_position:
|
||||||
|
federation_rows = self.federation_sender.get_replication_rows(
|
||||||
|
federation, limit,
|
||||||
|
)
|
||||||
|
upto_token = _position_from_rows(federation_rows, current_position)
|
||||||
|
writer.write_header_and_rows("federation", federation_rows, (
|
||||||
|
"position", "type", "content",
|
||||||
|
), position=upto_token)
|
||||||
|
|
||||||
|
|
||||||
class _Writer(object):
|
class _Writer(object):
|
||||||
"""Writes the streams as a JSON object as the response to the request"""
|
"""Writes the streams as a JSON object as the response to the request"""
|
||||||
|
@ -497,6 +520,7 @@ class _Writer(object):
|
||||||
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
|
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
|
||||||
"events", "presence", "typing", "receipts", "account_data", "backfill",
|
"events", "presence", "typing", "receipts", "account_data", "backfill",
|
||||||
"push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
|
"push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
|
||||||
|
"federation",
|
||||||
))):
|
))):
|
||||||
__slots__ = []
|
__slots__ = []
|
||||||
|
|
||||||
|
|
|
@ -29,9 +29,14 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
|
||||||
"DeviceInboxStreamChangeCache",
|
"DeviceInboxStreamChangeCache",
|
||||||
self._device_inbox_id_gen.get_current_token()
|
self._device_inbox_id_gen.get_current_token()
|
||||||
)
|
)
|
||||||
|
self._device_federation_outbox_stream_cache = StreamChangeCache(
|
||||||
|
"DeviceFederationOutboxStreamChangeCache",
|
||||||
|
self._device_inbox_id_gen.get_current_token()
|
||||||
|
)
|
||||||
|
|
||||||
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
|
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
|
||||||
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
|
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
|
||||||
|
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
|
||||||
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
|
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
|
|
|
@ -25,6 +25,9 @@ class TransactionStore(BaseSlavedStore):
|
||||||
].orig
|
].orig
|
||||||
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
|
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
|
||||||
|
|
||||||
|
def prep_send_transaction(self, transaction_id, destination, origin_server_ts):
|
||||||
|
return []
|
||||||
|
|
||||||
# For now, don't record the destination rety timings
|
# For now, don't record the destination rety timings
|
||||||
def set_destination_retry_timings(*args, **kwargs):
|
def set_destination_retry_timings(*args, **kwargs):
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
|
@ -32,6 +32,7 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
|
||||||
from synapse.crypto.keyring import Keyring
|
from synapse.crypto.keyring import Keyring
|
||||||
from synapse.events.builder import EventBuilderFactory
|
from synapse.events.builder import EventBuilderFactory
|
||||||
from synapse.federation import initialize_http_replication
|
from synapse.federation import initialize_http_replication
|
||||||
|
from synapse.federation.send_queue import FederationRemoteSendQueue
|
||||||
from synapse.federation.transport.client import TransportLayerClient
|
from synapse.federation.transport.client import TransportLayerClient
|
||||||
from synapse.federation.transaction_queue import TransactionQueue
|
from synapse.federation.transaction_queue import TransactionQueue
|
||||||
from synapse.handlers import Handlers
|
from synapse.handlers import Handlers
|
||||||
|
@ -273,7 +274,10 @@ class HomeServer(object):
|
||||||
return TransportLayerClient(self)
|
return TransportLayerClient(self)
|
||||||
|
|
||||||
def build_federation_sender(self):
|
def build_federation_sender(self):
|
||||||
|
if self.config.send_federation:
|
||||||
return TransactionQueue(self)
|
return TransactionQueue(self)
|
||||||
|
else:
|
||||||
|
return FederationRemoteSendQueue(self)
|
||||||
|
|
||||||
def remove_pusher(self, app_id, push_key, user_id):
|
def remove_pusher(self, app_id, push_key, user_id):
|
||||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||||
|
|
|
@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
|
||||||
status_msg (str): User set status message.
|
status_msg (str): User set status message.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def as_dict(self):
|
||||||
|
return dict(self._asdict())
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_dict(d):
|
||||||
|
return UserPresenceState(**d)
|
||||||
|
|
||||||
def copy_and_replace(self, **kwargs):
|
def copy_and_replace(self, **kwargs):
|
||||||
return self._replace(**kwargs)
|
return self._replace(**kwargs)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue