Merge pull request #1092 from matrix-org/erikj/transaction_queue_check

Check if destination is ready for retry earlier
This commit is contained in:
Erik Johnston 2016-09-09 13:49:56 +01:00 committed by GitHub
commit 685da5a3b0
1 changed files with 16 additions and 15 deletions

View File

@ -192,6 +192,12 @@ class TransactionQueue(object):
pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, [])
limiter = yield get_retry_limiter(
destination,
self.clock,
self.store,
)
device_message_edus, device_stream_id = ( device_message_edus, device_stream_id = (
yield self._get_new_device_messages(destination) yield self._get_new_device_messages(destination)
) )
@ -212,10 +218,18 @@ class TransactionQueue(object):
success = yield self._send_new_transaction( success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures, destination, pending_pdus, pending_edus, pending_failures,
device_stream_id, device_stream_id,
should_delete_from_device_stream=bool(device_message_edus) should_delete_from_device_stream=bool(device_message_edus),
limiter=limiter,
) )
if not success: if not success:
break break
except NotRetryingDestination:
logger.info(
"TX [%s] not ready for retry yet - "
"dropping transaction for now",
destination,
)
success = False
finally: finally:
# We want to be *very* sure we delete this after we stop processing # We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None) self.pending_transactions.pop(destination, None)
@ -242,7 +256,7 @@ class TransactionQueue(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus, def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures, device_stream_id, pending_failures, device_stream_id,
should_delete_from_device_stream): should_delete_from_device_stream, limiter):
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[1]) pending_pdus.sort(key=lambda t: t[1])
@ -257,12 +271,6 @@ class TransactionQueue(object):
txn_id = str(self._next_txn_id) txn_id = str(self._next_txn_id)
limiter = yield get_retry_limiter(
destination,
self.clock,
self.store,
)
logger.debug( logger.debug(
"TX [%s] {%s} Attempting new transaction" "TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)", " (pdus: %d, edus: %d, failures: %d)",
@ -359,13 +367,6 @@ class TransactionQueue(object):
destination, device_stream_id destination, device_stream_id
) )
self.last_device_stream_id_by_dest[destination] = device_stream_id self.last_device_stream_id_by_dest[destination] = device_stream_id
except NotRetryingDestination:
logger.info(
"TX [%s] not ready for retry yet - "
"dropping transaction for now",
destination,
)
success = False
except RuntimeError as e: except RuntimeError as e:
# We capture this here as there as nothing actually listens # We capture this here as there as nothing actually listens
# for this finishing functions deferred. # for this finishing functions deferred.