Limit how often we ask for keys from dead servers
This commit is contained in:
parent
0c057736ac
commit
ad816b0add
|
@ -22,6 +22,7 @@ from twisted.internet import defer
|
||||||
from synapse.api.errors import SynapseError, CodeMessageException
|
from synapse.api.errors import SynapseError, CodeMessageException
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
|
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
|
||||||
|
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -88,18 +89,28 @@ class E2eKeysHandler(object):
|
||||||
def do_remote_query(destination):
|
def do_remote_query(destination):
|
||||||
destination_query = remote_queries[destination]
|
destination_query = remote_queries[destination]
|
||||||
try:
|
try:
|
||||||
remote_result = yield self.federation.query_client_keys(
|
limiter = yield get_retry_limiter(
|
||||||
destination,
|
destination, self.clock, self.store
|
||||||
{"device_keys": destination_query},
|
|
||||||
timeout=timeout
|
|
||||||
)
|
)
|
||||||
|
with limiter:
|
||||||
|
remote_result = yield self.federation.query_client_keys(
|
||||||
|
destination,
|
||||||
|
{"device_keys": destination_query},
|
||||||
|
timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
for user_id, keys in remote_result["device_keys"].items():
|
for user_id, keys in remote_result["device_keys"].items():
|
||||||
if user_id in destination_query:
|
if user_id in destination_query:
|
||||||
results[user_id] = keys
|
results[user_id] = keys
|
||||||
|
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
failures[destination] = {
|
failures[destination] = {
|
||||||
"status": e.code, "message": e.message
|
"status": e.code, "message": e.message
|
||||||
}
|
}
|
||||||
|
except NotRetryingDestination as e:
|
||||||
|
failures[destination] = {
|
||||||
|
"status": 503, "message": "Not ready for retry",
|
||||||
|
}
|
||||||
|
|
||||||
yield preserve_context_over_deferred(defer.gatherResults([
|
yield preserve_context_over_deferred(defer.gatherResults([
|
||||||
preserve_fn(do_remote_query)(destination)
|
preserve_fn(do_remote_query)(destination)
|
||||||
|
@ -191,18 +202,26 @@ class E2eKeysHandler(object):
|
||||||
def claim_client_keys(destination):
|
def claim_client_keys(destination):
|
||||||
device_keys = remote_queries[destination]
|
device_keys = remote_queries[destination]
|
||||||
try:
|
try:
|
||||||
remote_result = yield self.federation.claim_client_keys(
|
limiter = yield get_retry_limiter(
|
||||||
destination,
|
destination, self.clock, self.store
|
||||||
{"one_time_keys": device_keys},
|
|
||||||
timeout=timeout
|
|
||||||
)
|
)
|
||||||
for user_id, keys in remote_result["one_time_keys"].items():
|
with limiter:
|
||||||
if user_id in device_keys:
|
remote_result = yield self.federation.claim_client_keys(
|
||||||
json_result[user_id] = keys
|
destination,
|
||||||
|
{"one_time_keys": device_keys},
|
||||||
|
timeout=timeout
|
||||||
|
)
|
||||||
|
for user_id, keys in remote_result["one_time_keys"].items():
|
||||||
|
if user_id in device_keys:
|
||||||
|
json_result[user_id] = keys
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
failures[destination] = {
|
failures[destination] = {
|
||||||
"status": e.code, "message": e.message
|
"status": e.code, "message": e.message
|
||||||
}
|
}
|
||||||
|
except NotRetryingDestination as e:
|
||||||
|
failures[destination] = {
|
||||||
|
"status": 503, "message": "Not ready for retry",
|
||||||
|
}
|
||||||
|
|
||||||
yield preserve_context_over_deferred(defer.gatherResults([
|
yield preserve_context_over_deferred(defer.gatherResults([
|
||||||
preserve_fn(claim_client_keys)(destination)
|
preserve_fn(claim_client_keys)(destination)
|
||||||
|
|
Loading…
Reference in New Issue