Comments
This commit is contained in:
parent
9687e039e7
commit
50934ce460
|
@ -215,6 +215,9 @@ def start(config_options):
|
||||||
|
|
||||||
|
|
||||||
class FederationSenderHandler(object):
|
class FederationSenderHandler(object):
|
||||||
|
"""Processes the replication stream and forwards the appropriate entries
|
||||||
|
to the federation sender.
|
||||||
|
"""
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
|
@ -236,16 +239,22 @@ class FederationSenderHandler(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def process_replication(self, result):
|
def process_replication(self, result):
|
||||||
|
# The federation stream contains things that we want to send out, e.g.
|
||||||
|
# presence, typing, etc.
|
||||||
fed_stream = result.get("federation")
|
fed_stream = result.get("federation")
|
||||||
if fed_stream:
|
if fed_stream:
|
||||||
latest_id = int(fed_stream["position"])
|
latest_id = int(fed_stream["position"])
|
||||||
|
|
||||||
|
# The federation stream containis a bunch of different types of
|
||||||
|
# rows that need to be handled differently. We parse the rows, put
|
||||||
|
# them into the appropriate collection and then send them off.
|
||||||
presence_to_send = {}
|
presence_to_send = {}
|
||||||
keyed_edus = {}
|
keyed_edus = {}
|
||||||
edus = {}
|
edus = {}
|
||||||
failures = {}
|
failures = {}
|
||||||
device_destinations = set()
|
device_destinations = set()
|
||||||
|
|
||||||
|
# Parse the rows in the stream
|
||||||
for row in fed_stream["rows"]:
|
for row in fed_stream["rows"]:
|
||||||
position, typ, content_js = row
|
position, typ, content_js = row
|
||||||
content = json.loads(content_js)
|
content = json.loads(content_js)
|
||||||
|
@ -276,6 +285,7 @@ class FederationSenderHandler(object):
|
||||||
else:
|
else:
|
||||||
raise Exception("Unrecognised federation type: %r", typ)
|
raise Exception("Unrecognised federation type: %r", typ)
|
||||||
|
|
||||||
|
# We've finished collecting, send everything off
|
||||||
for destination, states in presence_to_send.items():
|
for destination, states in presence_to_send.items():
|
||||||
self.federation_sender.send_presence(destination, states)
|
self.federation_sender.send_presence(destination, states)
|
||||||
|
|
||||||
|
@ -298,10 +308,12 @@ class FederationSenderHandler(object):
|
||||||
for destination in device_destinations:
|
for destination in device_destinations:
|
||||||
self.federation_sender.send_device_messages(destination)
|
self.federation_sender.send_device_messages(destination)
|
||||||
|
|
||||||
|
# Record where we are in the stream.
|
||||||
yield self.store.update_federation_out_pos(
|
yield self.store.update_federation_out_pos(
|
||||||
"federation", latest_id
|
"federation", latest_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# We also need to poke the federation sender when new events happen
|
||||||
event_stream = result.get("events")
|
event_stream = result.get("events")
|
||||||
if event_stream:
|
if event_stream:
|
||||||
latest_pos = event_stream["position"]
|
latest_pos = event_stream["position"]
|
||||||
|
|
|
@ -13,6 +13,22 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""A federation sender that forwards things to be sent across replication to
|
||||||
|
a worker process.
|
||||||
|
|
||||||
|
It assumes there is a single worker process feeding off of it.
|
||||||
|
|
||||||
|
Each row in the replication stream consists of a type and some json, where the
|
||||||
|
types indicate whether they are presence, or edus, etc.
|
||||||
|
|
||||||
|
Ephemeral or non-event data are queued up in-memory. When the worker requests
|
||||||
|
updates since a particular point, all in-memory data since before that point is
|
||||||
|
dropped. We also expire things in the queue after 5 minutes, to ensure that a
|
||||||
|
dead worker doesn't cause the queues to grow limitlessly.
|
||||||
|
|
||||||
|
Events are replicated via a separate events stream.
|
||||||
|
"""
|
||||||
|
|
||||||
from .units import Edu
|
from .units import Edu
|
||||||
|
|
||||||
from blist import sorteddict
|
from blist import sorteddict
|
||||||
|
@ -27,6 +43,7 @@ DEVICE_MESSAGE_TYPE = "d"
|
||||||
|
|
||||||
|
|
||||||
class FederationRemoteSendQueue(object):
|
class FederationRemoteSendQueue(object):
|
||||||
|
"""A drop in replacement for TransactionQueue"""
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
|
@ -58,6 +75,7 @@ class FederationRemoteSendQueue(object):
|
||||||
return pos
|
return pos
|
||||||
|
|
||||||
def _clear_queue(self):
|
def _clear_queue(self):
|
||||||
|
"""Clear the queues for anything older than N minutes"""
|
||||||
# TODO measure this function time.
|
# TODO measure this function time.
|
||||||
|
|
||||||
FIVE_MINUTES_AGO = 5 * 60 * 1000
|
FIVE_MINUTES_AGO = 5 * 60 * 1000
|
||||||
|
@ -75,6 +93,7 @@ class FederationRemoteSendQueue(object):
|
||||||
self._clear_queue_before_pos(position_to_delete)
|
self._clear_queue_before_pos(position_to_delete)
|
||||||
|
|
||||||
def _clear_queue_before_pos(self, position_to_delete):
|
def _clear_queue_before_pos(self, position_to_delete):
|
||||||
|
"""Clear all the queues from before a given position"""
|
||||||
# Delete things out of presence maps
|
# Delete things out of presence maps
|
||||||
keys = self.presence_changed.keys()
|
keys = self.presence_changed.keys()
|
||||||
i = keys.bisect_left(position_to_delete)
|
i = keys.bisect_left(position_to_delete)
|
||||||
|
@ -122,9 +141,13 @@ class FederationRemoteSendQueue(object):
|
||||||
del self.device_messages[key]
|
del self.device_messages[key]
|
||||||
|
|
||||||
def notify_new_events(self, current_id):
|
def notify_new_events(self, current_id):
|
||||||
|
"""As per TransactionQueue"""
|
||||||
|
# We don't need to replicate this as it gets sent down a different
|
||||||
|
# stream.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def send_edu(self, destination, edu_type, content, key=None):
|
def send_edu(self, destination, edu_type, content, key=None):
|
||||||
|
"""As per TransactionQueue"""
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
edu = Edu(
|
edu = Edu(
|
||||||
|
@ -142,6 +165,7 @@ class FederationRemoteSendQueue(object):
|
||||||
self.edus[pos] = edu
|
self.edus[pos] = edu
|
||||||
|
|
||||||
def send_presence(self, destination, states):
|
def send_presence(self, destination, states):
|
||||||
|
"""As per TransactionQueue"""
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
self.presence_map.update({
|
self.presence_map.update({
|
||||||
|
@ -154,11 +178,13 @@ class FederationRemoteSendQueue(object):
|
||||||
]
|
]
|
||||||
|
|
||||||
def send_failure(self, failure, destination):
|
def send_failure(self, failure, destination):
|
||||||
|
"""As per TransactionQueue"""
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
self.failures[pos] = (destination, str(failure))
|
self.failures[pos] = (destination, str(failure))
|
||||||
|
|
||||||
def send_device_messages(self, destination):
|
def send_device_messages(self, destination):
|
||||||
|
"""As per TransactionQueue"""
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
self.device_messages[pos] = destination
|
self.device_messages[pos] = destination
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,9 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_new_events(self, current_id):
|
def notify_new_events(self, current_id):
|
||||||
|
"""This gets called when we have some new events we might want to
|
||||||
|
send out to other servers.
|
||||||
|
"""
|
||||||
self._last_poked_id = max(current_id, self._last_poked_id)
|
self._last_poked_id = max(current_id, self._last_poked_id)
|
||||||
|
|
||||||
if self._is_processing:
|
if self._is_processing:
|
||||||
|
|
Loading…
Reference in New Issue