mirror of
https://github.com/matrix-org/synapse.git
synced 2025-01-10 02:06:42 +00:00
Change the way we get missing auth and state events
This commit is contained in:
parent
b8849c8cbf
commit
07699b5871
@ -281,6 +281,22 @@ class ReplicationLayer(object):
|
|||||||
|
|
||||||
defer.returnValue(pdus)
|
defer.returnValue(pdus)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
@log_function
|
||||||
|
def get_event_auth(self, destination, context, event_id):
|
||||||
|
res = yield self.transport_layer.get_event_auth(
|
||||||
|
destination, context, event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
auth_chain = [
|
||||||
|
self.event_from_pdu_json(p, outlier=True)
|
||||||
|
for p in res["auth_chain"]
|
||||||
|
]
|
||||||
|
|
||||||
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
|
defer.returnValue(auth_chain)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_backfill_request(self, origin, context, versions, limit):
|
def on_backfill_request(self, origin, context, versions, limit):
|
||||||
@ -549,34 +565,34 @@ class ReplicationLayer(object):
|
|||||||
state = None
|
state = None
|
||||||
|
|
||||||
# We need to make sure we have all the auth events.
|
# We need to make sure we have all the auth events.
|
||||||
for e_id, _ in pdu.auth_events:
|
# for e_id, _ in pdu.auth_events:
|
||||||
exists = yield self._get_persisted_pdu(
|
# exists = yield self._get_persisted_pdu(
|
||||||
origin,
|
# origin,
|
||||||
e_id,
|
# e_id,
|
||||||
do_auth=False
|
# do_auth=False
|
||||||
)
|
# )
|
||||||
|
#
|
||||||
if not exists:
|
# if not exists:
|
||||||
try:
|
# try:
|
||||||
logger.debug(
|
# logger.debug(
|
||||||
"_handle_new_pdu fetch missing auth event %s from %s",
|
# "_handle_new_pdu fetch missing auth event %s from %s",
|
||||||
e_id,
|
# e_id,
|
||||||
origin,
|
# origin,
|
||||||
)
|
# )
|
||||||
|
#
|
||||||
yield self.get_pdu(
|
# yield self.get_pdu(
|
||||||
origin,
|
# origin,
|
||||||
event_id=e_id,
|
# event_id=e_id,
|
||||||
outlier=True,
|
# outlier=True,
|
||||||
)
|
# )
|
||||||
|
#
|
||||||
logger.debug("Processed pdu %s", e_id)
|
# logger.debug("Processed pdu %s", e_id)
|
||||||
except:
|
# except:
|
||||||
logger.warn(
|
# logger.warn(
|
||||||
"Failed to get auth event %s from %s",
|
# "Failed to get auth event %s from %s",
|
||||||
e_id,
|
# e_id,
|
||||||
origin
|
# origin
|
||||||
)
|
# )
|
||||||
|
|
||||||
# Get missing pdus if necessary.
|
# Get missing pdus if necessary.
|
||||||
if not pdu.outlier:
|
if not pdu.outlier:
|
||||||
@ -626,6 +642,7 @@ class ReplicationLayer(object):
|
|||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
ret = yield self.handler.on_receive_pdu(
|
ret = yield self.handler.on_receive_pdu(
|
||||||
|
origin,
|
||||||
pdu,
|
pdu,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
state=state,
|
state=state,
|
||||||
|
@ -101,7 +101,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_receive_pdu(self, pdu, backfilled, state=None):
|
def on_receive_pdu(self, origin, pdu, backfilled, state=None):
|
||||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||||
do auth checks and put it through the StateHandler.
|
do auth checks and put it through the StateHandler.
|
||||||
"""
|
"""
|
||||||
@ -149,14 +149,47 @@ class FederationHandler(BaseHandler):
|
|||||||
# FIXME (erikj): Awful hack to make the case where we are not currently
|
# FIXME (erikj): Awful hack to make the case where we are not currently
|
||||||
# in the room work
|
# in the room work
|
||||||
current_state = None
|
current_state = None
|
||||||
if state:
|
is_in_room = yield self.auth.check_host_in_room(
|
||||||
is_in_room = yield self.auth.check_host_in_room(
|
event.room_id,
|
||||||
event.room_id,
|
self.server_name
|
||||||
self.server_name
|
)
|
||||||
|
if not is_in_room:
|
||||||
|
logger.debug("Got event for room we're not in.")
|
||||||
|
|
||||||
|
replication_layer = self.replication_layer
|
||||||
|
auth_chain = yield replication_layer.get_event_auth(
|
||||||
|
origin,
|
||||||
|
context=event.room_id,
|
||||||
|
event_id=event.event_id,
|
||||||
)
|
)
|
||||||
if not is_in_room:
|
|
||||||
logger.debug("Got event for room we're not in.")
|
current_state = yield replication_layer.get_state_for_context(
|
||||||
current_state = state
|
origin,
|
||||||
|
context=event.room_id,
|
||||||
|
event_id=event.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
for e in auth_chain:
|
||||||
|
e.outlier = True
|
||||||
|
try:
|
||||||
|
yield self._handle_new_event(e)
|
||||||
|
yield self.notifier.on_new_room_event(e)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse auth event %s",
|
||||||
|
e.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
for e in current_state:
|
||||||
|
e.outlier = True
|
||||||
|
try:
|
||||||
|
yield self._handle_new_event(e)
|
||||||
|
yield self.notifier.on_new_room_event(e)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse state event %s",
|
||||||
|
e.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield self._handle_new_event(
|
yield self._handle_new_event(
|
||||||
@ -328,18 +361,30 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
for e in auth_chain:
|
for e in auth_chain:
|
||||||
e.outlier = True
|
e.outlier = True
|
||||||
yield self._handle_new_event(e)
|
try:
|
||||||
yield self.notifier.on_new_room_event(
|
yield self._handle_new_event(e)
|
||||||
e, extra_users=[joinee]
|
yield self.notifier.on_new_room_event(
|
||||||
)
|
e, extra_users=[joinee]
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse auth event %s",
|
||||||
|
e.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
for e in state:
|
for e in state:
|
||||||
# FIXME: Auth these.
|
# FIXME: Auth these.
|
||||||
e.outlier = True
|
e.outlier = True
|
||||||
yield self._handle_new_event(e)
|
try:
|
||||||
yield self.notifier.on_new_room_event(
|
yield self._handle_new_event(e)
|
||||||
e, extra_users=[joinee]
|
yield self.notifier.on_new_room_event(
|
||||||
)
|
e, extra_users=[joinee]
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse state event %s",
|
||||||
|
e.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
yield self._handle_new_event(
|
yield self._handle_new_event(
|
||||||
event,
|
event,
|
||||||
|
Loading…
Reference in New Issue
Block a user