Merge pull request #7584 from matrix-org/erikj/save_and_send_fed_token_in_bg
Speed up processing of federation stream RDATA rows.
This commit is contained in:
commit
8c5f88fa4d
|
@ -0,0 +1 @@
|
||||||
|
Speed up processing of federation stream RDATA rows.
|
|
@ -863,9 +863,24 @@ class FederationSenderHandler(object):
|
||||||
a FEDERATION_ACK back to the master, and stores the token that we have processed
|
a FEDERATION_ACK back to the master, and stores the token that we have processed
|
||||||
in `federation_stream_position` so that we can restart where we left off.
|
in `federation_stream_position` so that we can restart where we left off.
|
||||||
"""
|
"""
|
||||||
try:
|
self.federation_position = token
|
||||||
self.federation_position = token
|
|
||||||
|
|
||||||
|
# We save and send the ACK to master asynchronously, so we don't block
|
||||||
|
# processing on persistence. We don't need to do this operation for
|
||||||
|
# every single RDATA we receive, we just need to do it periodically.
|
||||||
|
|
||||||
|
if self._fed_position_linearizer.is_queued(None):
|
||||||
|
# There is already a task queued up to save and send the token, so
|
||||||
|
# no need to queue up another task.
|
||||||
|
return
|
||||||
|
|
||||||
|
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
|
||||||
|
|
||||||
|
async def _save_and_send_ack(self):
|
||||||
|
"""Save the current federation position in the database and send an ACK
|
||||||
|
to master with where we're up to.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
# We linearize here to ensure we don't have races updating the token
|
# We linearize here to ensure we don't have races updating the token
|
||||||
#
|
#
|
||||||
# XXX this appears to be redundant, since the ReplicationCommandHandler
|
# XXX this appears to be redundant, since the ReplicationCommandHandler
|
||||||
|
@ -875,16 +890,18 @@ class FederationSenderHandler(object):
|
||||||
# we're not being re-entered?
|
# we're not being re-entered?
|
||||||
|
|
||||||
with (await self._fed_position_linearizer.queue(None)):
|
with (await self._fed_position_linearizer.queue(None)):
|
||||||
|
# We persist and ack the same position, so we take a copy of it
|
||||||
|
# here as otherwise it can get modified from underneath us.
|
||||||
|
current_position = self.federation_position
|
||||||
|
|
||||||
await self.store.update_federation_out_pos(
|
await self.store.update_federation_out_pos(
|
||||||
"federation", self.federation_position
|
"federation", current_position
|
||||||
)
|
)
|
||||||
|
|
||||||
# We ACK this token over replication so that the master can drop
|
# We ACK this token over replication so that the master can drop
|
||||||
# its in memory queues
|
# its in memory queues
|
||||||
self._hs.get_tcp_replication().send_federation_ack(
|
self._hs.get_tcp_replication().send_federation_ack(current_position)
|
||||||
self.federation_position
|
self._last_ack = current_position
|
||||||
)
|
|
||||||
self._last_ack = self.federation_position
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error updating federation stream position")
|
logger.exception("Error updating federation stream position")
|
||||||
|
|
||||||
|
|
|
@ -225,6 +225,18 @@ class Linearizer(object):
|
||||||
{}
|
{}
|
||||||
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
|
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
|
||||||
|
|
||||||
|
def is_queued(self, key) -> bool:
|
||||||
|
"""Checks whether there is a process queued up waiting
|
||||||
|
"""
|
||||||
|
entry = self.key_to_defer.get(key)
|
||||||
|
if not entry:
|
||||||
|
# No entry so nothing is waiting.
|
||||||
|
return False
|
||||||
|
|
||||||
|
# There are waiting deferreds only in the OrderedDict of deferreds is
|
||||||
|
# non-empty.
|
||||||
|
return bool(entry[1])
|
||||||
|
|
||||||
def queue(self, key):
|
def queue(self, key):
|
||||||
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
|
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
|
||||||
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
|
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
|
||||||
|
|
|
@ -45,6 +45,38 @@ class LinearizerTestCase(unittest.TestCase):
|
||||||
with (yield d2):
|
with (yield d2):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_linearizer_is_queued(self):
|
||||||
|
linearizer = Linearizer()
|
||||||
|
|
||||||
|
key = object()
|
||||||
|
|
||||||
|
d1 = linearizer.queue(key)
|
||||||
|
cm1 = yield d1
|
||||||
|
|
||||||
|
# Since d1 gets called immediately, "is_queued" should return false.
|
||||||
|
self.assertFalse(linearizer.is_queued(key))
|
||||||
|
|
||||||
|
d2 = linearizer.queue(key)
|
||||||
|
self.assertFalse(d2.called)
|
||||||
|
|
||||||
|
# Now d2 is queued up behind successful completion of cm1
|
||||||
|
self.assertTrue(linearizer.is_queued(key))
|
||||||
|
|
||||||
|
with cm1:
|
||||||
|
self.assertFalse(d2.called)
|
||||||
|
|
||||||
|
# cm1 still not done, so d2 still queued.
|
||||||
|
self.assertTrue(linearizer.is_queued(key))
|
||||||
|
|
||||||
|
# And now d2 is called and nothing is in the queue again
|
||||||
|
self.assertFalse(linearizer.is_queued(key))
|
||||||
|
|
||||||
|
with (yield d2):
|
||||||
|
self.assertFalse(linearizer.is_queued(key))
|
||||||
|
|
||||||
|
self.assertFalse(linearizer.is_queued(key))
|
||||||
|
|
||||||
def test_lots_of_queued_things(self):
|
def test_lots_of_queued_things(self):
|
||||||
# we have one slow thing, and lots of fast things queued up behind it.
|
# we have one slow thing, and lots of fast things queued up behind it.
|
||||||
# it should *not* explode the stack.
|
# it should *not* explode the stack.
|
||||||
|
|
Loading…
Reference in New Issue