Use new federation_sender DI

This commit is contained in:
Erik Johnston 2016-11-16 14:28:03 +00:00
parent 847d5db1d1
commit 59ef517e6b
7 changed files with 26 additions and 63 deletions

View File

@ -44,10 +44,6 @@ logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name # synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client") metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
sent_edus_counter = metrics.register_counter("sent_edus")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
@ -91,51 +87,6 @@ class FederationClient(FederationBase):
self._get_pdu_cache.start() self._get_pdu_cache.start()
@log_function
def send_pdu(self, pdu, destinations):
"""Informs the replication layer about a new PDU generated within the
home server that should be transmitted to others.
TODO: Figure out when we should actually resolve the deferred.
Args:
pdu (Pdu): The new Pdu.
Returns:
Deferred: Completes when we have successfully processed the PDU
and replicated it to any interested remote home servers.
"""
sent_pdus_destination_dist.inc_by(len(destinations))
logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id)
# TODO, add errback, etc.
self._transaction_queue.send_pdu(pdu, destinations)
logger.debug(
"[%s] transaction_layer.send_pdu... done",
pdu.event_id
)
def send_presence(self, destination, states):
if destination != self.server_name:
self._transaction_queue.send_presence(destination, states)
@log_function
def send_edu(self, destination, edu_type, content, key=None):
self._transaction_queue.send_edu(destination, edu_type, content, key=key)
@log_function
def send_device_messages(self, destination):
"""Sends the device messages in the local database to the remote
destination"""
self._transaction_queue.send_device_messages(destination)
@log_function
def send_failure(self, failure, destination):
self._transaction_queue.send_failure(failure, destination)
return defer.succeed(None)
@log_function @log_function
def make_query(self, destination, query_type, args, def make_query(self, destination, query_type, args,
retry_on_dns_fail=False): retry_on_dns_fail=False):

View File

@ -36,6 +36,12 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__) metrics = synapse.metrics.get_metrics_for(__name__)
client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = client_metrics.register_distribution(
"sent_pdu_destinations"
)
sent_edus_counter = client_metrics.register_counter("sent_edus")
class TransactionQueue(object): class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at """This class makes sure we only have one transaction in flight at
@ -135,6 +141,8 @@ class TransactionQueue(object):
if not destinations: if not destinations:
return return
sent_pdus_destination_dist.inc_by(len(destinations))
for destination in destinations: for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append( self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, order) (pdu, order)
@ -167,6 +175,8 @@ class TransactionQueue(object):
if not self.can_send_to(destination): if not self.can_send_to(destination):
return return
sent_edus_counter.inc()
if key: if key:
self.pending_edus_keyed_by_dest.setdefault( self.pending_edus_keyed_by_dest.setdefault(
destination, {} destination, {}

View File

@ -34,9 +34,9 @@ class DeviceMessageHandler(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id self.is_mine_id = hs.is_mine_id
self.federation = hs.get_replication_layer() self.federation = hs.get_federation_sender()
self.federation.register_edu_handler( hs.get_replication_layer().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu "m.direct_to_device", self.on_direct_to_device_edu
) )

View File

@ -71,6 +71,7 @@ class FederationHandler(BaseHandler):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer() self.replication_layer = hs.get_replication_layer()
self.federation_sender = hs.get_federation_sender()
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
self.keyring = hs.get_keyring() self.keyring = hs.get_keyring()
@ -94,7 +95,7 @@ class FederationHandler(BaseHandler):
processing. processing.
""" """
return self.replication_layer.send_pdu(event, destinations) return self.federation_sender.send_pdu(event, destinations)
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
@ -847,7 +848,7 @@ class FederationHandler(BaseHandler):
event.signatures, event.signatures,
) )
self.replication_layer.send_pdu(new_pdu, destinations) self.federation_sender.send_pdu(new_pdu, destinations)
state_ids = context.prev_state_ids.values() state_ids = context.prev_state_ids.values()
auth_chain = yield self.store.get_auth_chain(set( auth_chain = yield self.store.get_auth_chain(set(
@ -1071,7 +1072,7 @@ class FederationHandler(BaseHandler):
event.signatures, event.signatures,
) )
self.replication_layer.send_pdu(new_pdu, destinations) self.federation_sender.send_pdu(new_pdu, destinations)
defer.returnValue(None) defer.returnValue(None)

View File

@ -91,28 +91,29 @@ class PresenceHandler(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.wheel_timer = WheelTimer() self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.federation = hs.get_replication_layer() self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.federation.register_edu_handler( self.replication.register_edu_handler(
"m.presence", self.incoming_presence "m.presence", self.incoming_presence
) )
self.federation.register_edu_handler( self.replication.register_edu_handler(
"m.presence_invite", "m.presence_invite",
lambda origin, content: self.invite_presence( lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]), observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]), observer_user=UserID.from_string(content["observer_user"]),
) )
) )
self.federation.register_edu_handler( self.replication.register_edu_handler(
"m.presence_accept", "m.presence_accept",
lambda origin, content: self.accept_presence( lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]), observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]), observer_user=UserID.from_string(content["observer_user"]),
) )
) )
self.federation.register_edu_handler( self.replication.register_edu_handler(
"m.presence_deny", "m.presence_deny",
lambda origin, content: self.deny_presence( lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]), observed_user=UserID.from_string(content["observed_user"]),

View File

@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler):
self.server_name = hs.config.server_name self.server_name = hs.config.server_name
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.hs = hs self.hs = hs
self.federation = hs.get_replication_layer() self.federation = hs.get_federation_sender()
self.federation.register_edu_handler( hs.get_replication_layer().register_edu_handler(
"m.receipt", self._received_remote_receipt "m.receipt", self._received_remote_receipt
) )
self.clock = self.hs.get_clock() self.clock = self.hs.get_clock()

View File

@ -55,9 +55,9 @@ class TypingHandler(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.wheel_timer = WheelTimer(bucket_size=5000) self.wheel_timer = WheelTimer(bucket_size=5000)
self.federation = hs.get_replication_layer() self.federation = hs.get_federation_sender()
self.federation.register_edu_handler("m.typing", self._recv_edu) hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room) hs.get_distributor().observe("user_left_room", self.user_left_room)