Check if destination is ready for retry earlier
This commit is contained in:
parent
4598682b43
commit
a6c6750166
|
@ -192,6 +192,12 @@ class TransactionQueue(object):
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||||
|
|
||||||
|
limiter = yield get_retry_limiter(
|
||||||
|
destination,
|
||||||
|
self.clock,
|
||||||
|
self.store,
|
||||||
|
)
|
||||||
|
|
||||||
device_message_edus, device_stream_id = (
|
device_message_edus, device_stream_id = (
|
||||||
yield self._get_new_device_messages(destination)
|
yield self._get_new_device_messages(destination)
|
||||||
)
|
)
|
||||||
|
@ -212,10 +218,18 @@ class TransactionQueue(object):
|
||||||
success = yield self._send_new_transaction(
|
success = yield self._send_new_transaction(
|
||||||
destination, pending_pdus, pending_edus, pending_failures,
|
destination, pending_pdus, pending_edus, pending_failures,
|
||||||
device_stream_id,
|
device_stream_id,
|
||||||
should_delete_from_device_stream=bool(device_message_edus)
|
should_delete_from_device_stream=bool(device_message_edus),
|
||||||
|
limiter=limiter,
|
||||||
)
|
)
|
||||||
if not success:
|
if not success:
|
||||||
break
|
break
|
||||||
|
except NotRetryingDestination:
|
||||||
|
logger.info(
|
||||||
|
"TX [%s] not ready for retry yet - "
|
||||||
|
"dropping transaction for now",
|
||||||
|
destination,
|
||||||
|
)
|
||||||
|
success = False
|
||||||
finally:
|
finally:
|
||||||
# We want to be *very* sure we delete this after we stop processing
|
# We want to be *very* sure we delete this after we stop processing
|
||||||
self.pending_transactions.pop(destination, None)
|
self.pending_transactions.pop(destination, None)
|
||||||
|
@ -242,7 +256,7 @@ class TransactionQueue(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
||||||
pending_failures, device_stream_id,
|
pending_failures, device_stream_id,
|
||||||
should_delete_from_device_stream):
|
should_delete_from_device_stream, limiter):
|
||||||
|
|
||||||
# Sort based on the order field
|
# Sort based on the order field
|
||||||
pending_pdus.sort(key=lambda t: t[1])
|
pending_pdus.sort(key=lambda t: t[1])
|
||||||
|
@ -257,12 +271,6 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
txn_id = str(self._next_txn_id)
|
txn_id = str(self._next_txn_id)
|
||||||
|
|
||||||
limiter = yield get_retry_limiter(
|
|
||||||
destination,
|
|
||||||
self.clock,
|
|
||||||
self.store,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"TX [%s] {%s} Attempting new transaction"
|
"TX [%s] {%s} Attempting new transaction"
|
||||||
" (pdus: %d, edus: %d, failures: %d)",
|
" (pdus: %d, edus: %d, failures: %d)",
|
||||||
|
@ -359,13 +367,6 @@ class TransactionQueue(object):
|
||||||
destination, device_stream_id
|
destination, device_stream_id
|
||||||
)
|
)
|
||||||
self.last_device_stream_id_by_dest[destination] = device_stream_id
|
self.last_device_stream_id_by_dest[destination] = device_stream_id
|
||||||
except NotRetryingDestination:
|
|
||||||
logger.info(
|
|
||||||
"TX [%s] not ready for retry yet - "
|
|
||||||
"dropping transaction for now",
|
|
||||||
destination,
|
|
||||||
)
|
|
||||||
success = False
|
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
# We capture this here as there as nothing actually listens
|
# We capture this here as there as nothing actually listens
|
||||||
# for this finishing functions deferred.
|
# for this finishing functions deferred.
|
||||||
|
|
Loading…
Reference in New Issue