Merge pull request #948 from matrix-org/markjh/auth_fixes

Don't add rejections to the state_group, persist all rejections
This commit is contained in:
Mark Haines 2016-07-26 13:22:57 +01:00 committed by GitHub
commit 9c4cf83259
1 changed files with 110 additions and 50 deletions

View File

@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore):
@log_function @log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled): def _persist_events_txn(self, txn, events_and_contexts, backfilled):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
"""
depth_updates = {} depth_updates = {}
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids # Remove the any existing cache entries for the event_ids
@ -407,21 +413,11 @@ class EventsStore(SQLBaseStore):
event.room_id, event.internal_metadata.stream_ordering, event.room_id, event.internal_metadata.stream_ordering,
) )
if not event.internal_metadata.is_outlier(): if not event.internal_metadata.is_outlier() and not context.rejected:
depth_updates[event.room_id] = max( depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth) event.depth, depth_updates.get(event.room_id, event.depth)
) )
if context.push_actions:
self._set_push_actions_for_event_and_users_txn(
txn, event, context.push_actions
)
if event.type == EventTypes.Redaction and event.redacts is not None:
self._remove_push_actions_for_event_id_txn(
txn, event.room_id, event.redacts
)
for room_id, depth in depth_updates.items(): for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth) self._update_min_depth_for_room_txn(txn, room_id, depth)
@ -431,14 +427,24 @@ class EventsStore(SQLBaseStore):
), ),
[event.event_id for event, _ in events_and_contexts] [event.event_id for event, _ in events_and_contexts]
) )
have_persisted = { have_persisted = {
event_id: outlier event_id: outlier
for event_id, outlier in txn.fetchall() for event_id, outlier in txn.fetchall()
} }
# Remove the events that we've seen before.
event_map = {} event_map = {}
to_remove = set() to_remove = set()
for event, context in events_and_contexts: for event, context in events_and_contexts:
if context.rejected:
# If the event is rejected then we don't care if the event
# was an outlier or not.
if event.event_id in have_persisted:
# If we have already seen the event then ignore it.
to_remove.add(event)
continue
# Handle the case of the list including the same event multiple # Handle the case of the list including the same event multiple
# times. The tricky thing here is when they differ by whether # times. The tricky thing here is when they differ by whether
# they are an outlier. # they are an outlier.
@ -463,6 +469,12 @@ class EventsStore(SQLBaseStore):
outlier_persisted = have_persisted[event.event_id] outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted: if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.
# insert into the state_group, state_groups_state and
# event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, ((event, context),)) self._store_mult_state_groups_txn(txn, ((event, context),))
metadata_json = encode_json( metadata_json = encode_json(
@ -478,6 +490,8 @@ class EventsStore(SQLBaseStore):
(metadata_json, event.event_id,) (metadata_json, event.event_id,)
) )
# Add an entry to the ex_outlier_stream table to replicate the
# change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering stream_order = event.internal_metadata.stream_ordering
state_group_id = context.state_group or context.new_state_group_id state_group_id = context.state_group or context.new_state_group_id
self._simple_insert_txn( self._simple_insert_txn(
@ -499,6 +513,8 @@ class EventsStore(SQLBaseStore):
(False, event.event_id,) (False, event.event_id,)
) )
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
self._update_extremeties(txn, [event]) self._update_extremeties(txn, [event])
events_and_contexts = [ events_and_contexts = [
@ -506,38 +522,12 @@ class EventsStore(SQLBaseStore):
] ]
if not events_and_contexts: if not events_and_contexts:
# Make sure we don't pass an empty list to functions that expect to
# be storing at least one element.
return return
self._store_mult_state_groups_txn(txn, events_and_contexts) # From this point onwards the events are only events that we haven't
# seen before.
self._handle_mult_prev_events(
txn,
events=[event for event, _ in events_and_contexts],
)
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Message:
self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
elif event.type == EventTypes.RoomHistoryVisibility:
self._store_history_visibility_txn(txn, event)
elif event.type == EventTypes.GuestAccess:
self._store_guest_access_txn(txn, event)
self._store_room_members_txn(
txn,
[
event
for event, _ in events_and_contexts
if event.type == EventTypes.Member
],
backfilled=backfilled,
)
def event_dict(event): def event_dict(event):
return { return {
@ -591,10 +581,41 @@ class EventsStore(SQLBaseStore):
], ],
) )
if context.rejected: # Remove the rejected events from the list now that we've added them
self._store_rejections_txn( # to the events table and the events_json table.
txn, event.event_id, context.rejected to_remove = set()
) for event, context in events_and_contexts:
if context.rejected:
# Insert the event_id into the rejections table
self._store_rejections_txn(
txn, event.event_id, context.rejected
)
to_remove.add(event)
events_and_contexts = [
ec for ec in events_and_contexts if ec[0] not in to_remove
]
if not events_and_contexts:
# Make sure we don't pass an empty list to functions that expect to
# be storing at least one element.
return
# From this point onwards the events are only ones that weren't rejected.
for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table.
if context.push_actions:
self._set_push_actions_for_event_and_users_txn(
txn, event, context.push_actions
)
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
# redacted event.
self._remove_push_actions_for_event_id_txn(
txn, event.room_id, event.redacts
)
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
@ -610,6 +631,49 @@ class EventsStore(SQLBaseStore):
], ],
) )
# Insert into the state_groups, state_groups_state, and
# event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, events_and_contexts)
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
txn,
events=[event for event, _ in events_and_contexts],
)
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
# Insert into the room_names and event_search tables.
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
# Insert into the topics table and event_search table.
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Message:
# Insert into the event_search table.
self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
elif event.type == EventTypes.RoomHistoryVisibility:
# Insert into the event_search table.
self._store_history_visibility_txn(txn, event)
elif event.type == EventTypes.GuestAccess:
# Insert into the event_search table.
self._store_guest_access_txn(txn, event)
# Insert into the room_memberships table.
self._store_room_members_txn(
txn,
[
event
for event, _ in events_and_contexts
if event.type == EventTypes.Member
],
backfilled=backfilled,
)
# Insert event_reference_hashes table.
self._store_event_reference_hashes_txn( self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts] txn, [event for event, _ in events_and_contexts]
) )
@ -654,6 +718,7 @@ class EventsStore(SQLBaseStore):
], ],
) )
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts) self._add_to_cache(txn, events_and_contexts)
if backfilled: if backfilled:
@ -666,11 +731,6 @@ class EventsStore(SQLBaseStore):
# Outlier events shouldn't clobber the current state. # Outlier events shouldn't clobber the current state.
continue continue
if context.rejected:
# If the event failed it's auth checks then it shouldn't
# clobbler the current state.
continue
txn.call_after( txn.call_after(
self._get_current_state_for_key.invalidate, self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,) (event.room_id, event.type, event.state_key,)