diff --git a/changelog.d/14473.misc b/changelog.d/14473.misc
new file mode 100644
index 0000000000..deccd4e91a
--- /dev/null
+++ b/changelog.d/14473.misc
@@ -0,0 +1 @@
+Faster remote room joins: stream the un-partial-stating of rooms over replication.
\ No newline at end of file
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b1e55e1b9e..d4750a32e6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
         # Check if we are partially joining any rooms. If so we need to store
         # all device list updates so that we can handle them correctly once we
         # know who is in the room.
-        # TODO(faster joins): this fetches and processes a bunch of data that we don't
+        # TODO(faster_joins): this fetches and processes a bunch of data that we don't
         # use. Could be replaced by a tighter query e.g.
         #   SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
         partial_rooms = await self.store.get_partial_state_room_resync_info()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d92582fd5c..3398fcaf7d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -152,6 +152,7 @@ class FederationHandler:
         self._federation_event_handler = hs.get_federation_event_handler()
         self._device_handler = hs.get_device_handler()
         self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
+        self._notifier = hs.get_notifier()
 
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
@@ -1692,6 +1693,9 @@ class FederationHandler:
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
+                    # Poke the notifier so that other workers see the write to
+                    # the un-partial-stated rooms stream.
+                    self._notifier.notify_replication()
 
                     # TODO(faster_joins) update room stats and user directory?
                     #   https://github.com/matrix-org/synapse/issues/12814
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index b1cd55bf6f..8575666d9c 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import (
 )
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
 
 STREAMS_MAP = {
     stream.NAME: stream
@@ -61,6 +62,7 @@ STREAMS_MAP = {
         TagAccountDataStream,
         AccountDataStream,
         UserSignatureStream,
+        UnPartialStatedRoomStream,
     )
 }
 
@@ -80,4 +82,5 @@ __all__ = [
     "TagAccountDataStream",
     "AccountDataStream",
     "UserSignatureStream",
+    "UnPartialStatedRoomStream",
 ]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
new file mode 100644
index 0000000000..18f087ffa2
--- /dev/null
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -0,0 +1,48 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+import attr
+
+from synapse.replication.tcp.streams import Stream
+from synapse.replication.tcp.streams._base import current_token_without_instance
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedRoomStreamRow:
+    # ID of the room that has been un-partial-stated.
+    room_id: str
+
+
+class UnPartialStatedRoomStream(Stream):
+    """
+    Stream to notify about rooms becoming un-partial-stated;
+    that is, when the background sync finishes such that we now have full state for
+    the room.
+    """
+
+    NAME = "un_partial_stated_room"
+    ROW_TYPE = UnPartialStatedRoomStreamRow
+
+    def __init__(self, hs: "HomeServer"):
+        store = hs.get_datastores().main
+        super().__init__(
+            hs.get_instance_name(),
+            # TODO(faster_joins, multiple writers): we need to account for instance names
+            current_token_without_instance(store.get_un_partial_stated_rooms_token),
+            store.get_un_partial_stated_rooms_from_stream,
+        )
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 1309bfd374..78906a5e1d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1,5 +1,5 @@
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -50,8 +50,14 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import IdGenerator
+from synapse.storage.util.id_generators import (
+    AbstractStreamIdGenerator,
+    IdGenerator,
+    MultiWriterIdGenerator,
+    StreamIdGenerator,
+)
 from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         self.config: HomeServerConfig = hs.config
 
+        self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
+
+        if isinstance(database.engine, PostgresEngine):
+            self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="un_partial_stated_room_stream",
+                instance_name=self._instance_name,
+                tables=[
+                    ("un_partial_stated_room_stream", "instance_name", "stream_id")
+                ],
+                sequence_name="un_partial_stated_room_stream_sequence",
+                # TODO(faster_joins, multiple writers) Support multiple writers.
+                writers=["master"],
+            )
+        else:
+            self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
+                db_conn, "un_partial_stated_room_stream", "stream_id"
+            )
+
     async def store_room(
         self,
         room_id: str,
@@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         return room_servers
 
-    async def clear_partial_state_room(self, room_id: str) -> bool:
-        """Clears the partial state flag for a room.
-
-        Args:
-            room_id: The room whose partial state flag is to be cleared.
-
-        Returns:
-            `True` if the partial state flag has been cleared successfully.
-
-            `False` if the partial state flag could not be cleared because the room
-            still contains events with partial state.
-        """
-        try:
-            await self.db_pool.runInteraction(
-                "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
-            )
-            return True
-        except self.db_pool.engine.module.IntegrityError as e:
-            # Assume that any `IntegrityError`s are due to partial state events.
-            logger.info(
-                "Exception while clearing lazy partial-state-room %s, retrying: %s",
-                room_id,
-                e,
-            )
-            return False
-
-    def _clear_partial_state_room_txn(
-        self, txn: LoggingTransaction, room_id: str
-    ) -> None:
-        DatabasePool.simple_delete_txn(
-            txn,
-            table="partial_state_rooms_servers",
-            keyvalues={"room_id": room_id},
-        )
-        DatabasePool.simple_delete_one_txn(
-            txn,
-            table="partial_state_rooms",
-            keyvalues={"room_id": room_id},
-        )
-        self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
-        self._invalidate_cache_and_stream(
-            txn, self.get_partial_state_servers_at_join, (room_id,)
-        )
-
-        # We now delete anything from `device_lists_remote_pending` with a
-        # stream ID less than the minimum
-        # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
-        device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
-            txn,
-            table="partial_state_rooms",
-            keyvalues={},
-            retcol="MIN(device_lists_stream_id)",
-            allow_none=True,
-        )
-        if device_lists_stream_id is None:
-            # There are no rooms being currently partially joined, so we delete everything.
-            txn.execute("DELETE FROM device_lists_remote_pending")
-        else:
-            sql = """
-                DELETE FROM device_lists_remote_pending
-                WHERE stream_id <= ?
-            """
-            txn.execute(sql, (device_lists_stream_id,))
-
     @cached()
     async def is_partial_state_room(self, room_id: str) -> bool:
         """Checks if this room has partial state.
@@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         )
         return result["join_event_id"], result["device_lists_stream_id"]
 
+    def get_un_partial_stated_rooms_token(self) -> int:
+        # TODO(faster_joins, multiple writers): This is inappropriate if there
+        #     are multiple writers because workers that don't write often will
+        #     hold all readers up.
+        #     (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
+        #      explanation.)
+        return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+
+    async def get_un_partial_stated_rooms_from_stream(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+        """Get updates for caches replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_un_partial_stated_rooms_from_stream_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+            sql = """
+                SELECT stream_id, room_id
+                FROM un_partial_stated_room_stream
+                WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, instance_name, limit))
+            updates = [(row[0], (row[1],)) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_rooms_from_stream",
+            get_un_partial_stated_rooms_from_stream_txn,
+        )
+
 
 class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
 
         self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
 
+        self._instance_name = hs.get_instance_name()
+
     async def upsert_room_on_join(
         self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
     ) -> None:
@@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             self.is_room_blocked,
             (room_id,),
         )
+
+    async def clear_partial_state_room(self, room_id: str) -> bool:
+        """Clears the partial state flag for a room.
+
+        Args:
+            room_id: The room whose partial state flag is to be cleared.
+
+        Returns:
+            `True` if the partial state flag has been cleared successfully.
+
+            `False` if the partial state flag could not be cleared because the room
+            still contains events with partial state.
+        """
+        try:
+            async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
+                await self.db_pool.runInteraction(
+                    "clear_partial_state_room",
+                    self._clear_partial_state_room_txn,
+                    room_id,
+                    un_partial_state_room_stream_id,
+                )
+                return True
+        except self.db_pool.engine.module.IntegrityError as e:
+            # Assume that any `IntegrityError`s are due to partial state events.
+            logger.info(
+                "Exception while clearing lazy partial-state-room %s, retrying: %s",
+                room_id,
+                e,
+            )
+            return False
+
+    def _clear_partial_state_room_txn(
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        un_partial_state_room_stream_id: int,
+    ) -> None:
+        DatabasePool.simple_delete_txn(
+            txn,
+            table="partial_state_rooms_servers",
+            keyvalues={"room_id": room_id},
+        )
+        DatabasePool.simple_delete_one_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={"room_id": room_id},
+        )
+        self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+        self._invalidate_cache_and_stream(
+            txn, self.get_partial_state_servers_at_join, (room_id,)
+        )
+
+        DatabasePool.simple_insert_txn(
+            txn,
+            "un_partial_stated_room_stream",
+            {
+                "stream_id": un_partial_state_room_stream_id,
+                "instance_name": self._instance_name,
+                "room_id": room_id,
+            },
+        )
+
+        # We now delete anything from `device_lists_remote_pending` with a
+        # stream ID less than the minimum
+        # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+        device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={},
+            retcol="MIN(device_lists_stream_id)",
+            allow_none=True,
+        )
+        if device_lists_stream_id is None:
+            # There are no rooms being currently partially joined, so we delete everything.
+            txn.execute("DELETE FROM device_lists_remote_pending")
+        else:
+            sql = """
+                DELETE FROM device_lists_remote_pending
+                WHERE stream_id <= ?
+            """
+            txn.execute(sql, (device_lists_stream_id,))
diff --git a/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
new file mode 100644
index 0000000000..743196cfe3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
@@ -0,0 +1,32 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that a room has become un-partial-stated.
+CREATE TABLE un_partial_stated_room_stream(
+    -- Position in the stream
+    stream_id BIGINT PRIMARY KEY NOT NULL,
+
+    -- Which instance wrote this entry.
+    instance_name TEXT NOT NULL,
+
+    -- Which room has been un-partial-stated.
+    room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting a room, the database needs to be able to check here.
+-- This index is not unique because we can join a room multiple times in a server's lifetime,
+-- so the same room could be un-partial-stated multiple times!
+CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id);
diff --git a/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..c1aac0b385
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_room_stream_sequence;
+
+SELECT setval('un_partial_stated_room_stream_sequence', (
+    SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_room_stream
+));