mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-10-30 19:58:36 +00:00 
			
		
		
		
	Fix spinloop during partial state sync when a prev event is in backoff (#15351)
Previously, we would spin in a tight loop until `update_state_for_partial_state_event` stopped raising `FederationPullAttemptBackoffError`s. Replace the spinloop with a wait until the backoff period has expired. Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
		
							parent
							
								
									a3bad89d57
								
							
						
					
					
						commit
						d9f694932c
					
				
							
								
								
									
										1
									
								
								changelog.d/15351.bugfix
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								changelog.d/15351.bugfix
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | |||||||
|  | Fix a bug introduced in Synapse 1.70.0 where the background sync from a faster join could spin for hours when one of the events involved had been marked for backoff. | ||||||
| @ -27,7 +27,7 @@ from synapse.util import json_decoder | |||||||
| 
 | 
 | ||||||
| if typing.TYPE_CHECKING: | if typing.TYPE_CHECKING: | ||||||
|     from synapse.config.homeserver import HomeServerConfig |     from synapse.config.homeserver import HomeServerConfig | ||||||
|     from synapse.types import JsonDict |     from synapse.types import JsonDict, StrCollection | ||||||
| 
 | 
 | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
| 
 | 
 | ||||||
| @ -682,18 +682,27 @@ class FederationPullAttemptBackoffError(RuntimeError): | |||||||
|     Attributes: |     Attributes: | ||||||
|         event_id: The event_id which we are refusing to pull |         event_id: The event_id which we are refusing to pull | ||||||
|         message: A custom error message that gives more context |         message: A custom error message that gives more context | ||||||
|  |         retry_after_ms: The remaining backoff interval, in milliseconds | ||||||
|     """ |     """ | ||||||
| 
 | 
 | ||||||
|     def __init__(self, event_ids: List[str], message: Optional[str]): |     def __init__( | ||||||
|         self.event_ids = event_ids |         self, event_ids: "StrCollection", message: Optional[str], retry_after_ms: int | ||||||
|  |     ): | ||||||
|  |         event_ids = list(event_ids) | ||||||
| 
 | 
 | ||||||
|         if message: |         if message: | ||||||
|             error_message = message |             error_message = message | ||||||
|         else: |         else: | ||||||
|             error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)." |             error_message = ( | ||||||
|  |                 f"Not attempting to pull event_ids={event_ids} because we already " | ||||||
|  |                 "tried to pull them recently (backing off)." | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|         super().__init__(error_message) |         super().__init__(error_message) | ||||||
| 
 | 
 | ||||||
|  |         self.event_ids = event_ids | ||||||
|  |         self.retry_after_ms = retry_after_ms | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class HttpResponseException(CodeMessageException): | class HttpResponseException(CodeMessageException): | ||||||
|     """ |     """ | ||||||
|  | |||||||
| @ -1949,27 +1949,25 @@ class FederationHandler: | |||||||
|             ) |             ) | ||||||
|             for event in events: |             for event in events: | ||||||
|                 for attempt in itertools.count(): |                 for attempt in itertools.count(): | ||||||
|  |                     # We try a new destination on every iteration. | ||||||
|                     try: |                     try: | ||||||
|                         await self._federation_event_handler.update_state_for_partial_state_event( |                         while True: | ||||||
|                             destination, event |                             try: | ||||||
|                         ) |                                 await self._federation_event_handler.update_state_for_partial_state_event( | ||||||
|                         break |                                     destination, event | ||||||
|                     except FederationPullAttemptBackoffError as exc: |                                 ) | ||||||
|                         # Log a warning about why we failed to process the event (the error message |                                 break | ||||||
|                         # for `FederationPullAttemptBackoffError` is pretty good) |                             except FederationPullAttemptBackoffError as e: | ||||||
|                         logger.warning("_sync_partial_state_room: %s", exc) |                                 # We are in the backoff period for one of the event's | ||||||
|                         # We do not record a failed pull attempt when we backoff fetching a missing |                                 # prev_events. Wait it out and try again after. | ||||||
|                         # `prev_event` because not being able to fetch the `prev_events` just means |                                 logger.warning( | ||||||
|                         # we won't be able to de-outlier the pulled event. But we can still use an |                                     "%s; waiting for %d ms...", e, e.retry_after_ms | ||||||
|                         # `outlier` in the state/auth chain for another event. So we shouldn't stop |                                 ) | ||||||
|                         # a downstream event from trying to pull it. |                                 await self.clock.sleep(e.retry_after_ms / 1000) | ||||||
|                         # |  | ||||||
|                         # This avoids a cascade of backoff for all events in the DAG downstream from |  | ||||||
|                         # one event backoff upstream. |  | ||||||
|                     except FederationError as e: |  | ||||||
|                         # TODO: We should `record_event_failed_pull_attempt` here, |  | ||||||
|                         #   see https://github.com/matrix-org/synapse/issues/13700 |  | ||||||
| 
 | 
 | ||||||
|  |                         # Success, no need to try the rest of the destinations. | ||||||
|  |                         break | ||||||
|  |                     except FederationError as e: | ||||||
|                         if attempt == len(destinations) - 1: |                         if attempt == len(destinations) - 1: | ||||||
|                             # We have tried every remote server for this event. Give up. |                             # We have tried every remote server for this event. Give up. | ||||||
|                             # TODO(faster_joins) giving up isn't the right thing to do |                             # TODO(faster_joins) giving up isn't the right thing to do | ||||||
| @ -1986,6 +1984,8 @@ class FederationHandler: | |||||||
|                                 destination, |                                 destination, | ||||||
|                                 e, |                                 e, | ||||||
|                             ) |                             ) | ||||||
|  |                             # TODO: We should `record_event_failed_pull_attempt` here, | ||||||
|  |                             #   see https://github.com/matrix-org/synapse/issues/13700 | ||||||
|                             raise |                             raise | ||||||
| 
 | 
 | ||||||
|                         # Try the next remote server. |                         # Try the next remote server. | ||||||
|  | |||||||
| @ -140,6 +140,7 @@ class FederationEventHandler: | |||||||
|     """ |     """ | ||||||
| 
 | 
 | ||||||
|     def __init__(self, hs: "HomeServer"): |     def __init__(self, hs: "HomeServer"): | ||||||
|  |         self._clock = hs.get_clock() | ||||||
|         self._store = hs.get_datastores().main |         self._store = hs.get_datastores().main | ||||||
|         self._storage_controllers = hs.get_storage_controllers() |         self._storage_controllers = hs.get_storage_controllers() | ||||||
|         self._state_storage_controller = self._storage_controllers.state |         self._state_storage_controller = self._storage_controllers.state | ||||||
| @ -1038,8 +1039,8 @@ class FederationEventHandler: | |||||||
| 
 | 
 | ||||||
|         Raises: |         Raises: | ||||||
|             FederationPullAttemptBackoffError if we are are deliberately not attempting |             FederationPullAttemptBackoffError if we are are deliberately not attempting | ||||||
|                 to pull the given event over federation because we've already done so |                 to pull one of the given event's `prev_event`s over federation because | ||||||
|                 recently and are backing off. |                 we've already done so recently and are backing off. | ||||||
|             FederationError if we fail to get the state from the remote server after any |             FederationError if we fail to get the state from the remote server after any | ||||||
|                 missing `prev_event`s. |                 missing `prev_event`s. | ||||||
|         """ |         """ | ||||||
| @ -1053,13 +1054,22 @@ class FederationEventHandler: | |||||||
|         # If we've already recently attempted to pull this missing event, don't |         # If we've already recently attempted to pull this missing event, don't | ||||||
|         # try it again so soon. Since we have to fetch all of the prev_events, we can |         # try it again so soon. Since we have to fetch all of the prev_events, we can | ||||||
|         # bail early here if we find any to ignore. |         # bail early here if we find any to ignore. | ||||||
|         prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff( |         prevs_with_pull_backoff = ( | ||||||
|             room_id, missing_prevs |             await self._store.get_event_ids_to_not_pull_from_backoff( | ||||||
|  |                 room_id, missing_prevs | ||||||
|  |             ) | ||||||
|         ) |         ) | ||||||
|         if len(prevs_to_ignore) > 0: |         if len(prevs_with_pull_backoff) > 0: | ||||||
|             raise FederationPullAttemptBackoffError( |             raise FederationPullAttemptBackoffError( | ||||||
|                 event_ids=prevs_to_ignore, |                 event_ids=prevs_with_pull_backoff.keys(), | ||||||
|                 message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).", |                 message=( | ||||||
|  |                     f"While computing context for event={event_id}, not attempting to " | ||||||
|  |                     f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} " | ||||||
|  |                     "because we already tried to pull recently (backing off)." | ||||||
|  |                 ), | ||||||
|  |                 retry_after_ms=( | ||||||
|  |                     max(prevs_with_pull_backoff.values()) - self._clock.time_msec() | ||||||
|  |                 ), | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         if not missing_prevs: |         if not missing_prevs: | ||||||
|  | |||||||
| @ -1544,7 +1544,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas | |||||||
|         self, |         self, | ||||||
|         room_id: str, |         room_id: str, | ||||||
|         event_ids: Collection[str], |         event_ids: Collection[str], | ||||||
|     ) -> List[str]: |     ) -> Dict[str, int]: | ||||||
|         """ |         """ | ||||||
|         Filter down the events to ones that we've failed to pull before recently. Uses |         Filter down the events to ones that we've failed to pull before recently. Uses | ||||||
|         exponential backoff. |         exponential backoff. | ||||||
| @ -1554,7 +1554,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas | |||||||
|             event_ids: A list of events to filter down |             event_ids: A list of events to filter down | ||||||
| 
 | 
 | ||||||
|         Returns: |         Returns: | ||||||
|             List of event_ids that should not be attempted to be pulled |             A dictionary of event_ids that should not be attempted to be pulled and the | ||||||
|  |             next timestamp at which we may try pulling them again. | ||||||
|         """ |         """ | ||||||
|         event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( |         event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( | ||||||
|             table="event_failed_pull_attempts", |             table="event_failed_pull_attempts", | ||||||
| @ -1570,22 +1571,28 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas | |||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         current_time = self._clock.time_msec() |         current_time = self._clock.time_msec() | ||||||
|         return [ | 
 | ||||||
|             event_failed_pull_attempt["event_id"] |         event_ids_with_backoff = {} | ||||||
|             for event_failed_pull_attempt in event_failed_pull_attempts |         for event_failed_pull_attempt in event_failed_pull_attempts: | ||||||
|  |             event_id = event_failed_pull_attempt["event_id"] | ||||||
|             # Exponential back-off (up to the upper bound) so we don't try to |             # Exponential back-off (up to the upper bound) so we don't try to | ||||||
|             # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. |             # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. | ||||||
|             if current_time |             backoff_end_time = ( | ||||||
|             < event_failed_pull_attempt["last_attempt_ts"] |                 event_failed_pull_attempt["last_attempt_ts"] | ||||||
|             + ( |                 + ( | ||||||
|                 2 |                     2 | ||||||
|                 ** min( |                     ** min( | ||||||
|                     event_failed_pull_attempt["num_attempts"], |                         event_failed_pull_attempt["num_attempts"], | ||||||
|                     BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, |                         BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, | ||||||
|  |                     ) | ||||||
|                 ) |                 ) | ||||||
|  |                 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS | ||||||
|             ) |             ) | ||||||
|             * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS | 
 | ||||||
|         ] |             if current_time < backoff_end_time:  # `backoff_end_time` is exclusive | ||||||
|  |                 event_ids_with_backoff[event_id] = backoff_end_time | ||||||
|  | 
 | ||||||
|  |         return event_ids_with_backoff | ||||||
| 
 | 
 | ||||||
|     async def get_missing_events( |     async def get_missing_events( | ||||||
|         self, |         self, | ||||||
|  | |||||||
| @ -1143,19 +1143,24 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): | |||||||
|         tok = self.login("alice", "test") |         tok = self.login("alice", "test") | ||||||
|         room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) |         room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) | ||||||
| 
 | 
 | ||||||
|  |         failure_time = self.clock.time_msec() | ||||||
|         self.get_success( |         self.get_success( | ||||||
|             self.store.record_event_failed_pull_attempt( |             self.store.record_event_failed_pull_attempt( | ||||||
|                 room_id, "$failed_event_id", "fake cause" |                 room_id, "$failed_event_id", "fake cause" | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         event_ids_to_backoff = self.get_success( |         event_ids_with_backoff = self.get_success( | ||||||
|             self.store.get_event_ids_to_not_pull_from_backoff( |             self.store.get_event_ids_to_not_pull_from_backoff( | ||||||
|                 room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] |                 room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         self.assertEqual(event_ids_to_backoff, ["$failed_event_id"]) |         self.assertEqual( | ||||||
|  |             event_ids_with_backoff, | ||||||
|  |             # We expect a 2^1 hour backoff after a single failed attempt. | ||||||
|  |             {"$failed_event_id": failure_time + 2 * 60 * 60 * 1000}, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|     def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( |     def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( | ||||||
|         self, |         self, | ||||||
| @ -1179,14 +1184,14 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): | |||||||
|         # attempt (2^1 hours). |         # attempt (2^1 hours). | ||||||
|         self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) |         self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) | ||||||
| 
 | 
 | ||||||
|         event_ids_to_backoff = self.get_success( |         event_ids_with_backoff = self.get_success( | ||||||
|             self.store.get_event_ids_to_not_pull_from_backoff( |             self.store.get_event_ids_to_not_pull_from_backoff( | ||||||
|                 room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] |                 room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
|         # Since this function only returns events we should backoff from, time has |         # Since this function only returns events we should backoff from, time has | ||||||
|         # elapsed past the backoff range so there is no events to backoff from. |         # elapsed past the backoff range so there is no events to backoff from. | ||||||
|         self.assertEqual(event_ids_to_backoff, []) |         self.assertEqual(event_ids_with_backoff, {}) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @attr.s(auto_attribs=True) | @attr.s(auto_attribs=True) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user