mirror of
https://github.com/matrix-org/synapse.git
synced 2025-01-07 16:57:08 +00:00
Merge branch 'release-v0.5.3' of github.com:matrix-org/synapse
This commit is contained in:
commit
5e26f6f3ae
@ -1,3 +1,11 @@
|
|||||||
|
Changes in synapse 0.5.3 (2014-11-27)
|
||||||
|
=====================================
|
||||||
|
|
||||||
|
* Fix bug that caused joining a remote room to fail if a single event was not
|
||||||
|
signed correctly.
|
||||||
|
* Fix bug which caused servers to continuously try and fetch events from other
|
||||||
|
servers.
|
||||||
|
|
||||||
Changes in synapse 0.5.2 (2014-11-26)
|
Changes in synapse 0.5.2 (2014-11-26)
|
||||||
=====================================
|
=====================================
|
||||||
|
|
||||||
|
@ -16,4 +16,4 @@
|
|||||||
""" This is a reference implementation of a synapse home server.
|
""" This is a reference implementation of a synapse home server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__version__ = "0.5.2"
|
__version__ = "0.5.3"
|
||||||
|
@ -202,7 +202,10 @@ class Auth(object):
|
|||||||
|
|
||||||
# Invites are valid iff caller is in the room and target isn't.
|
# Invites are valid iff caller is in the room and target isn't.
|
||||||
if not caller_in_room: # caller isn't joined
|
if not caller_in_room: # caller isn't joined
|
||||||
raise AuthError(403, "You are not in room %s." % event.room_id)
|
raise AuthError(
|
||||||
|
403,
|
||||||
|
"%s not in room %s." % (event.user_id, event.room_id,)
|
||||||
|
)
|
||||||
elif target_in_room: # the target is already in the room.
|
elif target_in_room: # the target is already in the room.
|
||||||
raise AuthError(403, "%s is already in the room." %
|
raise AuthError(403, "%s is already in the room." %
|
||||||
target_user_id)
|
target_user_id)
|
||||||
@ -225,7 +228,10 @@ class Auth(object):
|
|||||||
# TODO (erikj): Implement kicks.
|
# TODO (erikj): Implement kicks.
|
||||||
|
|
||||||
if not caller_in_room: # trying to leave a room you aren't joined
|
if not caller_in_room: # trying to leave a room you aren't joined
|
||||||
raise AuthError(403, "You are not in room %s." % event.room_id)
|
raise AuthError(
|
||||||
|
403,
|
||||||
|
"%s not in room %s." % (target_user_id, event.room_id,)
|
||||||
|
)
|
||||||
elif target_user_id != event.user_id:
|
elif target_user_id != event.user_id:
|
||||||
if kick_level:
|
if kick_level:
|
||||||
kick_level = int(kick_level)
|
kick_level = int(kick_level)
|
||||||
|
@ -281,6 +281,22 @@ class ReplicationLayer(object):
|
|||||||
|
|
||||||
defer.returnValue(pdus)
|
defer.returnValue(pdus)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
@log_function
|
||||||
|
def get_event_auth(self, destination, context, event_id):
|
||||||
|
res = yield self.transport_layer.get_event_auth(
|
||||||
|
destination, context, event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
auth_chain = [
|
||||||
|
self.event_from_pdu_json(p, outlier=True)
|
||||||
|
for p in res["auth_chain"]
|
||||||
|
]
|
||||||
|
|
||||||
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
|
defer.returnValue(auth_chain)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_backfill_request(self, origin, context, versions, limit):
|
def on_backfill_request(self, origin, context, versions, limit):
|
||||||
@ -549,34 +565,34 @@ class ReplicationLayer(object):
|
|||||||
state = None
|
state = None
|
||||||
|
|
||||||
# We need to make sure we have all the auth events.
|
# We need to make sure we have all the auth events.
|
||||||
for e_id, _ in pdu.auth_events:
|
# for e_id, _ in pdu.auth_events:
|
||||||
exists = yield self._get_persisted_pdu(
|
# exists = yield self._get_persisted_pdu(
|
||||||
origin,
|
# origin,
|
||||||
e_id,
|
# e_id,
|
||||||
do_auth=False
|
# do_auth=False
|
||||||
)
|
# )
|
||||||
|
#
|
||||||
if not exists:
|
# if not exists:
|
||||||
try:
|
# try:
|
||||||
logger.debug(
|
# logger.debug(
|
||||||
"_handle_new_pdu fetch missing auth event %s from %s",
|
# "_handle_new_pdu fetch missing auth event %s from %s",
|
||||||
e_id,
|
# e_id,
|
||||||
origin,
|
# origin,
|
||||||
)
|
# )
|
||||||
|
#
|
||||||
yield self.get_pdu(
|
# yield self.get_pdu(
|
||||||
origin,
|
# origin,
|
||||||
event_id=e_id,
|
# event_id=e_id,
|
||||||
outlier=True,
|
# outlier=True,
|
||||||
)
|
# )
|
||||||
|
#
|
||||||
logger.debug("Processed pdu %s", e_id)
|
# logger.debug("Processed pdu %s", e_id)
|
||||||
except:
|
# except:
|
||||||
logger.warn(
|
# logger.warn(
|
||||||
"Failed to get auth event %s from %s",
|
# "Failed to get auth event %s from %s",
|
||||||
e_id,
|
# e_id,
|
||||||
origin
|
# origin
|
||||||
)
|
# )
|
||||||
|
|
||||||
# Get missing pdus if necessary.
|
# Get missing pdus if necessary.
|
||||||
if not pdu.outlier:
|
if not pdu.outlier:
|
||||||
@ -626,6 +642,7 @@ class ReplicationLayer(object):
|
|||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
ret = yield self.handler.on_receive_pdu(
|
ret = yield self.handler.on_receive_pdu(
|
||||||
|
origin,
|
||||||
pdu,
|
pdu,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
state=state,
|
state=state,
|
||||||
|
@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
|
|||||||
if auth_user not in self._streams_per_user:
|
if auth_user not in self._streams_per_user:
|
||||||
self._streams_per_user[auth_user] = 0
|
self._streams_per_user[auth_user] = 0
|
||||||
if auth_user in self._stop_timer_per_user:
|
if auth_user in self._stop_timer_per_user:
|
||||||
|
try:
|
||||||
self.clock.cancel_call_later(
|
self.clock.cancel_call_later(
|
||||||
self._stop_timer_per_user.pop(auth_user))
|
self._stop_timer_per_user.pop(auth_user)
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception("Failed to cancel event timer")
|
||||||
else:
|
else:
|
||||||
yield self.distributor.fire(
|
yield self.distributor.fire(
|
||||||
"started_user_eventstream", auth_user
|
"started_user_eventstream", auth_user
|
||||||
@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
|
|||||||
logger.debug(
|
logger.debug(
|
||||||
"_later stopped_user_eventstream %s", auth_user
|
"_later stopped_user_eventstream %s", auth_user
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._stop_timer_per_user.pop(auth_user, None)
|
||||||
|
|
||||||
yield self.distributor.fire(
|
yield self.distributor.fire(
|
||||||
"stopped_user_eventstream", auth_user
|
"stopped_user_eventstream", auth_user
|
||||||
)
|
)
|
||||||
del self._stop_timer_per_user[auth_user]
|
|
||||||
|
|
||||||
logger.debug("Scheduling _later: for %s", auth_user)
|
logger.debug("Scheduling _later: for %s", auth_user)
|
||||||
self._stop_timer_per_user[auth_user] = (
|
self._stop_timer_per_user[auth_user] = (
|
||||||
|
@ -101,7 +101,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_receive_pdu(self, pdu, backfilled, state=None):
|
def on_receive_pdu(self, origin, pdu, backfilled, state=None):
|
||||||
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
""" Called by the ReplicationLayer when we have a new pdu. We need to
|
||||||
do auth checks and put it through the StateHandler.
|
do auth checks and put it through the StateHandler.
|
||||||
"""
|
"""
|
||||||
@ -112,7 +112,7 @@ class FederationHandler(BaseHandler):
|
|||||||
# If we are currently in the process of joining this room, then we
|
# If we are currently in the process of joining this room, then we
|
||||||
# queue up events for later processing.
|
# queue up events for later processing.
|
||||||
if event.room_id in self.room_queues:
|
if event.room_id in self.room_queues:
|
||||||
self.room_queues[event.room_id].append(pdu)
|
self.room_queues[event.room_id].append((pdu, origin))
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("Processing event: %s", event.event_id)
|
logger.debug("Processing event: %s", event.event_id)
|
||||||
@ -149,15 +149,50 @@ class FederationHandler(BaseHandler):
|
|||||||
# FIXME (erikj): Awful hack to make the case where we are not currently
|
# FIXME (erikj): Awful hack to make the case where we are not currently
|
||||||
# in the room work
|
# in the room work
|
||||||
current_state = None
|
current_state = None
|
||||||
if state:
|
|
||||||
is_in_room = yield self.auth.check_host_in_room(
|
is_in_room = yield self.auth.check_host_in_room(
|
||||||
event.room_id,
|
event.room_id,
|
||||||
self.server_name
|
self.server_name
|
||||||
)
|
)
|
||||||
if not is_in_room:
|
if not is_in_room and not event.outlier:
|
||||||
logger.debug("Got event for room we're not in.")
|
logger.debug("Got event for room we're not in.")
|
||||||
|
|
||||||
|
replication_layer = self.replication_layer
|
||||||
|
auth_chain = yield replication_layer.get_event_auth(
|
||||||
|
origin,
|
||||||
|
context=event.room_id,
|
||||||
|
event_id=event.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
for e in auth_chain:
|
||||||
|
e.outlier = True
|
||||||
|
try:
|
||||||
|
yield self._handle_new_event(e, fetch_missing=False)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse auth event %s",
|
||||||
|
e.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not state:
|
||||||
|
state = yield replication_layer.get_state_for_context(
|
||||||
|
origin,
|
||||||
|
context=event.room_id,
|
||||||
|
event_id=event.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
current_state = state
|
current_state = state
|
||||||
|
|
||||||
|
if state:
|
||||||
|
for e in state:
|
||||||
|
e.outlier = True
|
||||||
|
try:
|
||||||
|
yield self._handle_new_event(e)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse state event %s",
|
||||||
|
e.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield self._handle_new_event(
|
yield self._handle_new_event(
|
||||||
event,
|
event,
|
||||||
@ -251,6 +286,16 @@ class FederationHandler(BaseHandler):
|
|||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_event_auth(self, event_id):
|
def on_event_auth(self, event_id):
|
||||||
auth = yield self.store.get_auth_chain(event_id)
|
auth = yield self.store.get_auth_chain(event_id)
|
||||||
|
|
||||||
|
for event in auth:
|
||||||
|
event.signatures.update(
|
||||||
|
compute_event_signature(
|
||||||
|
event,
|
||||||
|
self.hs.hostname,
|
||||||
|
self.hs.config.signing_key[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue([e for e in auth])
|
defer.returnValue([e for e in auth])
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@ -310,6 +355,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
state = ret["state"]
|
state = ret["state"]
|
||||||
auth_chain = ret["auth_chain"]
|
auth_chain = ret["auth_chain"]
|
||||||
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
||||||
logger.debug("do_invite_join state: %s", state)
|
logger.debug("do_invite_join state: %s", state)
|
||||||
@ -328,23 +374,32 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
for e in auth_chain:
|
for e in auth_chain:
|
||||||
e.outlier = True
|
e.outlier = True
|
||||||
yield self._handle_new_event(e)
|
try:
|
||||||
yield self.notifier.on_new_room_event(
|
yield self._handle_new_event(e, fetch_missing=False)
|
||||||
e, extra_users=[joinee]
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse auth event %s",
|
||||||
|
e.event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
for e in state:
|
for e in state:
|
||||||
# FIXME: Auth these.
|
# FIXME: Auth these.
|
||||||
e.outlier = True
|
e.outlier = True
|
||||||
yield self._handle_new_event(e)
|
try:
|
||||||
yield self.notifier.on_new_room_event(
|
yield self._handle_new_event(
|
||||||
e, extra_users=[joinee]
|
e,
|
||||||
|
fetch_missing=True
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse state event %s",
|
||||||
|
e.event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self._handle_new_event(
|
yield self._handle_new_event(
|
||||||
event,
|
event,
|
||||||
state=state,
|
state=state,
|
||||||
current_state=state
|
current_state=state,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.notifier.on_new_room_event(
|
yield self.notifier.on_new_room_event(
|
||||||
@ -356,9 +411,9 @@ class FederationHandler(BaseHandler):
|
|||||||
room_queue = self.room_queues[room_id]
|
room_queue = self.room_queues[room_id]
|
||||||
del self.room_queues[room_id]
|
del self.room_queues[room_id]
|
||||||
|
|
||||||
for p in room_queue:
|
for p, origin in room_queue:
|
||||||
try:
|
try:
|
||||||
self.on_receive_pdu(p, backfilled=False)
|
self.on_receive_pdu(origin, p, backfilled=False)
|
||||||
except:
|
except:
|
||||||
logger.exception("Couldn't handle pdu")
|
logger.exception("Couldn't handle pdu")
|
||||||
|
|
||||||
@ -507,7 +562,17 @@ class FederationHandler(BaseHandler):
|
|||||||
else:
|
else:
|
||||||
del results[(event.type, event.state_key)]
|
del results[(event.type, event.state_key)]
|
||||||
|
|
||||||
defer.returnValue(results.values())
|
res = results.values()
|
||||||
|
for event in res:
|
||||||
|
event.signatures.update(
|
||||||
|
compute_event_signature(
|
||||||
|
event,
|
||||||
|
self.hs.hostname,
|
||||||
|
self.hs.config.signing_key[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(res)
|
||||||
else:
|
else:
|
||||||
defer.returnValue([])
|
defer.returnValue([])
|
||||||
|
|
||||||
@ -540,6 +605,17 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if event:
|
if event:
|
||||||
|
# FIXME: This is a temporary work around where we occasionally
|
||||||
|
# return events slightly differently than when they were
|
||||||
|
# originally signed
|
||||||
|
event.signatures.update(
|
||||||
|
compute_event_signature(
|
||||||
|
event,
|
||||||
|
self.hs.hostname,
|
||||||
|
self.hs.config.signing_key[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if do_auth:
|
if do_auth:
|
||||||
in_room = yield self.auth.check_host_in_room(
|
in_room = yield self.auth.check_host_in_room(
|
||||||
event.room_id,
|
event.room_id,
|
||||||
@ -567,11 +643,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _handle_new_event(self, event, state=None, backfilled=False,
|
def _handle_new_event(self, event, state=None, backfilled=False,
|
||||||
current_state=None):
|
current_state=None, fetch_missing=True):
|
||||||
if state:
|
|
||||||
for s in state:
|
|
||||||
yield self._handle_new_event(s)
|
|
||||||
|
|
||||||
is_new_state = yield self.state_handler.annotate_event_with_state(
|
is_new_state = yield self.state_handler.annotate_event_with_state(
|
||||||
event,
|
event,
|
||||||
old_state=state
|
old_state=state
|
||||||
@ -611,11 +683,22 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if not e:
|
if not e:
|
||||||
raise AuthError(
|
e = yield self.replication_layer.get_pdu(
|
||||||
403,
|
event.origin, e_id, outlier=True
|
||||||
"Can't find auth event %s." % (e_id, )
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if e and fetch_missing:
|
||||||
|
try:
|
||||||
|
yield self.on_receive_pdu(event.origin, e, False)
|
||||||
|
except:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to parse auth event %s",
|
||||||
|
e_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not e:
|
||||||
|
logger.warn("Can't find auth event %s.", e_id)
|
||||||
|
|
||||||
auth_events[(e.type, e.state_key)] = e
|
auth_events[(e.type, e.state_key)] = e
|
||||||
|
|
||||||
if event.type == RoomMemberEvent.TYPE and not event.auth_events:
|
if event.type == RoomMemberEvent.TYPE and not event.auth_events:
|
||||||
|
@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
|
|||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT *, (%(redacted)s) AS redacted FROM events "
|
"SELECT *, (%(redacted)s) AS redacted FROM events "
|
||||||
"WHERE room_id = ? AND stream_ordering <= ? "
|
"WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
|
||||||
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
|
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
|
||||||
) % {
|
) % {
|
||||||
"redacted": del_sql,
|
"redacted": del_sql,
|
||||||
|
@ -42,6 +42,7 @@ class FederationTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.auth = NonCallableMock(spec_set=[
|
self.auth = NonCallableMock(spec_set=[
|
||||||
"check",
|
"check",
|
||||||
|
"check_host_in_room",
|
||||||
])
|
])
|
||||||
|
|
||||||
self.hostname = "test"
|
self.hostname = "test"
|
||||||
@ -89,13 +90,16 @@ class FederationTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
self.datastore.persist_event.return_value = defer.succeed(None)
|
self.datastore.persist_event.return_value = defer.succeed(None)
|
||||||
self.datastore.get_room.return_value = defer.succeed(True)
|
self.datastore.get_room.return_value = defer.succeed(True)
|
||||||
|
self.auth.check_host_in_room.return_value = defer.succeed(True)
|
||||||
|
|
||||||
def annotate(ev, old_state=None):
|
def annotate(ev, old_state=None):
|
||||||
ev.old_state_events = []
|
ev.old_state_events = []
|
||||||
return defer.succeed(False)
|
return defer.succeed(False)
|
||||||
self.state_handler.annotate_event_with_state.side_effect = annotate
|
self.state_handler.annotate_event_with_state.side_effect = annotate
|
||||||
|
|
||||||
yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
|
yield self.handlers.federation_handler.on_receive_pdu(
|
||||||
|
"fo", pdu, False
|
||||||
|
)
|
||||||
|
|
||||||
self.datastore.persist_event.assert_called_once_with(
|
self.datastore.persist_event.assert_called_once_with(
|
||||||
ANY, is_new_state=False, backfilled=False, current_state=None
|
ANY, is_new_state=False, backfilled=False, current_state=None
|
||||||
|
Loading…
Reference in New Issue
Block a user