mirror of
https://github.com/matrix-org/synapse.git
synced 2025-02-20 22:25:47 +00:00
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
7ceacaaa6e
@ -1,3 +1,12 @@
|
|||||||
|
Changes in synapse v0.20.0 (2017-04-11)
|
||||||
|
=======================================
|
||||||
|
|
||||||
|
Bug fixes:
|
||||||
|
|
||||||
|
* Fix joining rooms over federation where not all servers in the room saw the
|
||||||
|
new server had joined (PR #2094)
|
||||||
|
|
||||||
|
|
||||||
Changes in synapse v0.20.0-rc1 (2017-03-30)
|
Changes in synapse v0.20.0-rc1 (2017-03-30)
|
||||||
===========================================
|
===========================================
|
||||||
|
|
||||||
|
@ -26,28 +26,10 @@ expose the append-only log to the readers should be fairly minimal.
|
|||||||
Architecture
|
Architecture
|
||||||
------------
|
------------
|
||||||
|
|
||||||
The Replication API
|
The Replication Protocol
|
||||||
~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
Synapse will optionally expose a long poll HTTP API for extracting updates. The
|
See ``tcp_replication.rst``
|
||||||
API will have a similar shape to /sync in that clients provide tokens
|
|
||||||
indicating where in the log they have reached and a timeout. The synapse server
|
|
||||||
then either responds with updates immediately if it already has updates or it
|
|
||||||
waits until the timeout for more updates. If the timeout expires and nothing
|
|
||||||
happened then the server returns an empty response.
|
|
||||||
|
|
||||||
However unlike the /sync API this replication API is returning synapse specific
|
|
||||||
data rather than trying to implement a matrix specification. The replication
|
|
||||||
results are returned as arrays of rows where the rows are mostly lifted
|
|
||||||
directly from the database. This avoids unnecessary JSON parsing on the server
|
|
||||||
and hopefully avoids an impedance mismatch between the data returned and the
|
|
||||||
required updates to the datastore.
|
|
||||||
|
|
||||||
This does not replicate all the database tables as many of the database tables
|
|
||||||
are indexes that can be recovered from the contents of other tables.
|
|
||||||
|
|
||||||
The format and parameters for the api are documented in
|
|
||||||
``synapse/replication/resource.py``.
|
|
||||||
|
|
||||||
|
|
||||||
The Slaved DataStore
|
The Slaved DataStore
|
||||||
|
@ -12,7 +12,7 @@ across multiple processes is a recipe for disaster, plus you should be using
|
|||||||
postgres anyway if you care about scalability).
|
postgres anyway if you care about scalability).
|
||||||
|
|
||||||
The workers communicate with the master synapse process via a synapse-specific
|
The workers communicate with the master synapse process via a synapse-specific
|
||||||
HTTP protocol called 'replication' - analogous to MySQL or Postgres style
|
TCP protocol called 'replication' - analogous to MySQL or Postgres style
|
||||||
database replication; feeding a stream of relevant data to the workers so they
|
database replication; feeding a stream of relevant data to the workers so they
|
||||||
can be kept in sync with the main synapse process and database state.
|
can be kept in sync with the main synapse process and database state.
|
||||||
|
|
||||||
@ -21,16 +21,11 @@ To enable workers, you need to add a replication listener to the master synapse,
|
|||||||
listeners:
|
listeners:
|
||||||
- port: 9092
|
- port: 9092
|
||||||
bind_address: '127.0.0.1'
|
bind_address: '127.0.0.1'
|
||||||
type: http
|
type: replication
|
||||||
tls: false
|
|
||||||
x_forwarded: false
|
|
||||||
resources:
|
|
||||||
- names: [replication]
|
|
||||||
compress: false
|
|
||||||
|
|
||||||
Under **no circumstances** should this replication API listener be exposed to the
|
Under **no circumstances** should this replication API listener be exposed to the
|
||||||
public internet; it currently implements no authentication whatsoever and is
|
public internet; it currently implements no authentication whatsoever and is
|
||||||
unencrypted HTTP.
|
unencrypted.
|
||||||
|
|
||||||
You then create a set of configs for the various worker processes. These should be
|
You then create a set of configs for the various worker processes. These should be
|
||||||
worker configuration files should be stored in a dedicated subdirectory, to allow
|
worker configuration files should be stored in a dedicated subdirectory, to allow
|
||||||
@ -50,14 +45,16 @@ e.g. the HTTP listener that it provides (if any); logging configuration; etc.
|
|||||||
You should minimise the number of overrides though to maintain a usable config.
|
You should minimise the number of overrides though to maintain a usable config.
|
||||||
|
|
||||||
You must specify the type of worker application (worker_app) and the replication
|
You must specify the type of worker application (worker_app) and the replication
|
||||||
endpoint that it's talking to on the main synapse process (worker_replication_url).
|
endpoint that it's talking to on the main synapse process (worker_replication_host
|
||||||
|
and worker_replication_port).
|
||||||
|
|
||||||
For instance::
|
For instance::
|
||||||
|
|
||||||
worker_app: synapse.app.synchrotron
|
worker_app: synapse.app.synchrotron
|
||||||
|
|
||||||
# The replication listener on the synapse to talk to.
|
# The replication listener on the synapse to talk to.
|
||||||
worker_replication_url: http://127.0.0.1:9092/_synapse/replication
|
worker_replication_host: 127.0.0.1
|
||||||
|
worker_replication_port: 9092
|
||||||
|
|
||||||
worker_listeners:
|
worker_listeners:
|
||||||
- type: http
|
- type: http
|
||||||
@ -95,4 +92,3 @@ To manipulate a specific worker, you pass the -w option to synctl::
|
|||||||
All of the above is highly experimental and subject to change as Synapse evolves,
|
All of the above is highly experimental and subject to change as Synapse evolves,
|
||||||
but documenting it here to help folks needing highly scalable Synapses similar
|
but documenting it here to help folks needing highly scalable Synapses similar
|
||||||
to the one running matrix.org!
|
to the one running matrix.org!
|
||||||
|
|
||||||
|
@ -16,4 +16,4 @@
|
|||||||
""" This is a reference implementation of a Matrix home server.
|
""" This is a reference implementation of a Matrix home server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__version__ = "0.20.0-rc1"
|
__version__ = "0.20.0"
|
||||||
|
@ -28,6 +28,7 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
|||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
|
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
@ -55,7 +56,7 @@ logger = logging.getLogger("synapse.app.appservice")
|
|||||||
|
|
||||||
class FederationSenderSlaveStore(
|
class FederationSenderSlaveStore(
|
||||||
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
|
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
|
||||||
SlavedRegistrationStore, SlavedDeviceStore,
|
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
|
||||||
):
|
):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
|
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
|
||||||
|
@ -55,7 +55,6 @@ from synapse.crypto import context_factory
|
|||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||||
from synapse.metrics import register_memory_metrics, get_metrics_for
|
from synapse.metrics import register_memory_metrics, get_metrics_for
|
||||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
|
|
||||||
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
||||||
from synapse.federation.transport.server import TransportLayerServer
|
from synapse.federation.transport.server import TransportLayerServer
|
||||||
|
|
||||||
@ -167,9 +166,6 @@ class SynapseHomeServer(HomeServer):
|
|||||||
if name == "metrics" and self.get_config().enable_metrics:
|
if name == "metrics" and self.get_config().enable_metrics:
|
||||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||||
|
|
||||||
if name == "replication":
|
|
||||||
resources[REPLICATION_PREFIX] = ReplicationResource(self)
|
|
||||||
|
|
||||||
if WEB_CLIENT_PREFIX in resources:
|
if WEB_CLIENT_PREFIX in resources:
|
||||||
root_resource = RootRedirect(WEB_CLIENT_PREFIX)
|
root_resource = RootRedirect(WEB_CLIENT_PREFIX)
|
||||||
else:
|
else:
|
||||||
|
@ -20,7 +20,7 @@ from synapse.api.constants import EventTypes
|
|||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
from synapse.handlers.presence import PresenceHandler
|
from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
@ -44,7 +44,7 @@ from synapse.replication.tcp.client import ReplicationClientHandler
|
|||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.client_ips import ClientIpStore
|
from synapse.storage.client_ips import ClientIpStore
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.storage.presence import PresenceStore, UserPresenceState
|
from synapse.storage.presence import UserPresenceState
|
||||||
from synapse.storage.roommember import RoomMemberStore
|
from synapse.storage.roommember import RoomMemberStore
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
|
||||||
@ -89,16 +89,6 @@ class SynchrotronSlavedStore(
|
|||||||
RoomMemberStore.__dict__["did_forget"]
|
RoomMemberStore.__dict__["did_forget"]
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: This is a bit broken because we don't persist the accepted list in a
|
|
||||||
# way that can be replicated. This means that we don't have a way to
|
|
||||||
# invalidate the cache correctly.
|
|
||||||
get_presence_list_accepted = PresenceStore.__dict__[
|
|
||||||
"get_presence_list_accepted"
|
|
||||||
]
|
|
||||||
get_presence_list_observers_accepted = PresenceStore.__dict__[
|
|
||||||
"get_presence_list_observers_accepted"
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||||
|
|
||||||
@ -172,7 +162,6 @@ class SynchrotronPresence(object):
|
|||||||
|
|
||||||
get_states = PresenceHandler.get_states.__func__
|
get_states = PresenceHandler.get_states.__func__
|
||||||
get_state = PresenceHandler.get_state.__func__
|
get_state = PresenceHandler.get_state.__func__
|
||||||
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
|
|
||||||
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
||||||
|
|
||||||
def user_syncing(self, user_id, affect_presence):
|
def user_syncing(self, user_id, affect_presence):
|
||||||
@ -206,10 +195,8 @@ class SynchrotronPresence(object):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_from_replication(self, states, stream_id):
|
def notify_from_replication(self, states, stream_id):
|
||||||
parties = yield self._get_interested_parties(
|
parties = yield get_interested_parties(self.store, states)
|
||||||
states, calculate_remote_hosts=False
|
room_ids_to_states, users_to_states = parties
|
||||||
)
|
|
||||||
room_ids_to_states, users_to_states, _ = parties
|
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
|
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
|
||||||
|
@ -53,18 +53,19 @@ class FederationRemoteSendQueue(object):
|
|||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
self.presence_map = {}
|
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
|
||||||
self.presence_changed = sorteddict()
|
self.presence_changed = sorteddict() # Stream position -> user_id
|
||||||
|
|
||||||
self.keyed_edu = {}
|
self.keyed_edu = {} # (destination, key) -> EDU
|
||||||
self.keyed_edu_changed = sorteddict()
|
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
|
||||||
|
|
||||||
self.edus = sorteddict()
|
self.edus = sorteddict() # stream position -> Edu
|
||||||
|
|
||||||
self.failures = sorteddict()
|
self.failures = sorteddict() # stream position -> (destination, Failure)
|
||||||
|
|
||||||
self.device_messages = sorteddict()
|
self.device_messages = sorteddict() # stream position -> destination
|
||||||
|
|
||||||
self.pos = 1
|
self.pos = 1
|
||||||
self.pos_time = sorteddict()
|
self.pos_time = sorteddict()
|
||||||
@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
|
|||||||
del self.presence_changed[key]
|
del self.presence_changed[key]
|
||||||
|
|
||||||
user_ids = set(
|
user_ids = set(
|
||||||
user_id for uids in self.presence_changed.values() for _, user_id in uids
|
user_id
|
||||||
|
for uids in self.presence_changed.itervalues()
|
||||||
|
for user_id in uids
|
||||||
)
|
)
|
||||||
|
|
||||||
to_del = [
|
to_del = [
|
||||||
@ -187,18 +190,20 @@ class FederationRemoteSendQueue(object):
|
|||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
self.notifier.on_new_replication_data()
|
||||||
|
|
||||||
def send_presence(self, destination, states):
|
def send_presence(self, states):
|
||||||
"""As per TransactionQueue"""
|
"""As per TransactionQueue
|
||||||
|
|
||||||
|
Args:
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
"""
|
||||||
pos = self._next_pos()
|
pos = self._next_pos()
|
||||||
|
|
||||||
self.presence_map.update({
|
# We only want to send presence for our own users, so lets always just
|
||||||
state.user_id: state
|
# filter here just in case.
|
||||||
for state in states
|
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
|
||||||
})
|
|
||||||
|
|
||||||
self.presence_changed[pos] = [
|
self.presence_map.update({state.user_id: state for state in local_states})
|
||||||
(destination, state.user_id) for state in states
|
self.presence_changed[pos] = [state.user_id for state in local_states]
|
||||||
]
|
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
self.notifier.on_new_replication_data()
|
||||||
|
|
||||||
@ -251,15 +256,14 @@ class FederationRemoteSendQueue(object):
|
|||||||
keys = self.presence_changed.keys()
|
keys = self.presence_changed.keys()
|
||||||
i = keys.bisect_right(from_token)
|
i = keys.bisect_right(from_token)
|
||||||
j = keys.bisect_right(to_token) + 1
|
j = keys.bisect_right(to_token) + 1
|
||||||
dest_user_ids = set(
|
dest_user_ids = [
|
||||||
(pos, dest_user_id)
|
(pos, user_id)
|
||||||
for pos in keys[i:j]
|
for pos in keys[i:j]
|
||||||
for dest_user_id in self.presence_changed[pos]
|
for user_id in self.presence_changed[pos]
|
||||||
)
|
]
|
||||||
|
|
||||||
for (key, (dest, user_id)) in dest_user_ids:
|
for (key, user_id) in dest_user_ids:
|
||||||
rows.append((key, PresenceRow(
|
rows.append((key, PresenceRow(
|
||||||
destination=dest,
|
|
||||||
state=self.presence_map[user_id],
|
state=self.presence_map[user_id],
|
||||||
)))
|
)))
|
||||||
|
|
||||||
@ -267,9 +271,12 @@ class FederationRemoteSendQueue(object):
|
|||||||
keys = self.keyed_edu_changed.keys()
|
keys = self.keyed_edu_changed.keys()
|
||||||
i = keys.bisect_right(from_token)
|
i = keys.bisect_right(from_token)
|
||||||
j = keys.bisect_right(to_token) + 1
|
j = keys.bisect_right(to_token) + 1
|
||||||
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
|
# We purposefully clobber based on the key here, python dict comprehensions
|
||||||
|
# always use the last value, so this will correctly point to the last
|
||||||
|
# stream position.
|
||||||
|
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
|
||||||
|
|
||||||
for (pos, (destination, edu_key)) in keyed_edus:
|
for ((destination, edu_key), pos) in keyed_edus.iteritems():
|
||||||
rows.append((pos, KeyedEduRow(
|
rows.append((pos, KeyedEduRow(
|
||||||
key=edu_key,
|
key=edu_key,
|
||||||
edu=self.keyed_edu[(destination, edu_key)],
|
edu=self.keyed_edu[(destination, edu_key)],
|
||||||
@ -279,7 +286,7 @@ class FederationRemoteSendQueue(object):
|
|||||||
keys = self.edus.keys()
|
keys = self.edus.keys()
|
||||||
i = keys.bisect_right(from_token)
|
i = keys.bisect_right(from_token)
|
||||||
j = keys.bisect_right(to_token) + 1
|
j = keys.bisect_right(to_token) + 1
|
||||||
edus = set((k, self.edus[k]) for k in keys[i:j])
|
edus = ((k, self.edus[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, edu) in edus:
|
for (pos, edu) in edus:
|
||||||
rows.append((pos, EduRow(edu)))
|
rows.append((pos, EduRow(edu)))
|
||||||
@ -288,7 +295,7 @@ class FederationRemoteSendQueue(object):
|
|||||||
keys = self.failures.keys()
|
keys = self.failures.keys()
|
||||||
i = keys.bisect_right(from_token)
|
i = keys.bisect_right(from_token)
|
||||||
j = keys.bisect_right(to_token) + 1
|
j = keys.bisect_right(to_token) + 1
|
||||||
failures = set((k, self.failures[k]) for k in keys[i:j])
|
failures = ((k, self.failures[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, (destination, failure)) in failures:
|
for (pos, (destination, failure)) in failures:
|
||||||
rows.append((pos, FailureRow(
|
rows.append((pos, FailureRow(
|
||||||
@ -300,9 +307,9 @@ class FederationRemoteSendQueue(object):
|
|||||||
keys = self.device_messages.keys()
|
keys = self.device_messages.keys()
|
||||||
i = keys.bisect_right(from_token)
|
i = keys.bisect_right(from_token)
|
||||||
j = keys.bisect_right(to_token) + 1
|
j = keys.bisect_right(to_token) + 1
|
||||||
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
|
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
|
||||||
|
|
||||||
for (pos, destination) in device_messages:
|
for (destination, pos) in device_messages.iteritems():
|
||||||
rows.append((pos, DeviceRow(
|
rows.append((pos, DeviceRow(
|
||||||
destination=destination,
|
destination=destination,
|
||||||
)))
|
)))
|
||||||
@ -354,7 +361,6 @@ class BaseFederationRow(object):
|
|||||||
|
|
||||||
|
|
||||||
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
|
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
|
||||||
"destination", # str
|
|
||||||
"state", # UserPresenceState
|
"state", # UserPresenceState
|
||||||
))):
|
))):
|
||||||
TypeId = "p"
|
TypeId = "p"
|
||||||
@ -362,24 +368,24 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def from_data(data):
|
def from_data(data):
|
||||||
return PresenceRow(
|
return PresenceRow(
|
||||||
destination=data["destination"],
|
state=UserPresenceState.from_dict(data)
|
||||||
state=UserPresenceState.from_dict(data["state"])
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def to_data(self):
|
def to_data(self):
|
||||||
return {
|
return self.state.as_dict()
|
||||||
"destination": self.destination,
|
|
||||||
"state": self.state.as_dict()
|
|
||||||
}
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
def add_to_buffer(self, buff):
|
||||||
buff.presence.setdefault(self.destination, []).append(self.state)
|
buff.presence.append(self.state)
|
||||||
|
|
||||||
|
|
||||||
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
|
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
|
||||||
"key", # tuple(str) - the edu key passed to send_edu
|
"key", # tuple(str) - the edu key passed to send_edu
|
||||||
"edu", # Edu
|
"edu", # Edu
|
||||||
))):
|
))):
|
||||||
|
"""Streams EDUs that have an associated key that is ued to clobber. For example,
|
||||||
|
typing EDUs clobber based on room_id.
|
||||||
|
"""
|
||||||
|
|
||||||
TypeId = "k"
|
TypeId = "k"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -404,6 +410,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
|
|||||||
class EduRow(BaseFederationRow, namedtuple("EduRow", (
|
class EduRow(BaseFederationRow, namedtuple("EduRow", (
|
||||||
"edu", # Edu
|
"edu", # Edu
|
||||||
))):
|
))):
|
||||||
|
"""Streams EDUs that don't have keys. See KeyedEduRow
|
||||||
|
"""
|
||||||
TypeId = "e"
|
TypeId = "e"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -421,6 +429,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
|
|||||||
"destination", # str
|
"destination", # str
|
||||||
"failure",
|
"failure",
|
||||||
))):
|
))):
|
||||||
|
"""Streams failures to a remote server. Failures are issued when there was
|
||||||
|
something wrong with a transaction the remote sent us, e.g. it included
|
||||||
|
an event that was invalid.
|
||||||
|
"""
|
||||||
|
|
||||||
TypeId = "f"
|
TypeId = "f"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -443,6 +456,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
|
|||||||
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
|
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
|
||||||
"destination", # str
|
"destination", # str
|
||||||
))):
|
))):
|
||||||
|
"""Streams the fact that either a) there is pending to device messages for
|
||||||
|
users on the remote, or b) a local users device has changed and needs to
|
||||||
|
be sent to the remote.
|
||||||
|
"""
|
||||||
TypeId = "d"
|
TypeId = "d"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -469,7 +486,7 @@ TypeToRow = {
|
|||||||
|
|
||||||
|
|
||||||
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
|
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
|
||||||
"presence", # dict of destination -> [UserPresenceState]
|
"presence", # list(UserPresenceState)
|
||||||
"keyed_edus", # dict of destination -> { key -> Edu }
|
"keyed_edus", # dict of destination -> { key -> Edu }
|
||||||
"edus", # dict of destination -> [Edu]
|
"edus", # dict of destination -> [Edu]
|
||||||
"failures", # dict of destination -> [failures]
|
"failures", # dict of destination -> [failures]
|
||||||
@ -491,7 +508,7 @@ def process_rows_for_federation(transaction_queue, rows):
|
|||||||
# them into the appropriate collection and then send them off.
|
# them into the appropriate collection and then send them off.
|
||||||
|
|
||||||
buff = ParsedFederationStreamData(
|
buff = ParsedFederationStreamData(
|
||||||
presence={},
|
presence=[],
|
||||||
keyed_edus={},
|
keyed_edus={},
|
||||||
edus={},
|
edus={},
|
||||||
failures={},
|
failures={},
|
||||||
@ -508,8 +525,8 @@ def process_rows_for_federation(transaction_queue, rows):
|
|||||||
parsed_row = RowType.from_data(row.data)
|
parsed_row = RowType.from_data(row.data)
|
||||||
parsed_row.add_to_buffer(buff)
|
parsed_row.add_to_buffer(buff)
|
||||||
|
|
||||||
for destination, states in buff.presence.iteritems():
|
if buff.presence:
|
||||||
transaction_queue.send_presence(destination, states)
|
transaction_queue.send_presence(buff.presence)
|
||||||
|
|
||||||
for destination, edu_map in buff.keyed_edus.iteritems():
|
for destination, edu_map in buff.keyed_edus.iteritems():
|
||||||
for key, edu in edu_map.items():
|
for key, edu in edu_map.items():
|
||||||
|
@ -21,11 +21,11 @@ from .units import Transaction, Edu
|
|||||||
|
|
||||||
from synapse.api.errors import HttpResponseException
|
from synapse.api.errors import HttpResponseException
|
||||||
from synapse.util.async import run_on_reactor
|
from synapse.util.async import run_on_reactor
|
||||||
from synapse.util.logcontext import preserve_context_over_fn
|
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
|
||||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
@ -41,6 +41,8 @@ sent_pdus_destination_dist = client_metrics.register_distribution(
|
|||||||
)
|
)
|
||||||
sent_edus_counter = client_metrics.register_counter("sent_edus")
|
sent_edus_counter = client_metrics.register_counter("sent_edus")
|
||||||
|
|
||||||
|
sent_transactions_counter = client_metrics.register_counter("sent_transactions")
|
||||||
|
|
||||||
|
|
||||||
class TransactionQueue(object):
|
class TransactionQueue(object):
|
||||||
"""This class makes sure we only have one transaction in flight at
|
"""This class makes sure we only have one transaction in flight at
|
||||||
@ -77,8 +79,18 @@ class TransactionQueue(object):
|
|||||||
# destination -> list of tuple(edu, deferred)
|
# destination -> list of tuple(edu, deferred)
|
||||||
self.pending_edus_by_dest = edus = {}
|
self.pending_edus_by_dest = edus = {}
|
||||||
|
|
||||||
# Presence needs to be separate as we send single aggragate EDUs
|
# Map of user_id -> UserPresenceState for all the pending presence
|
||||||
|
# to be sent out by user_id. Entries here get processed and put in
|
||||||
|
# pending_presence_by_dest
|
||||||
|
self.pending_presence = {}
|
||||||
|
|
||||||
|
# Map of destination -> user_id -> UserPresenceState of pending presence
|
||||||
|
# to be sent to each destinations
|
||||||
self.pending_presence_by_dest = presence = {}
|
self.pending_presence_by_dest = presence = {}
|
||||||
|
|
||||||
|
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
|
||||||
|
# based on their key (e.g. typing events by room_id)
|
||||||
|
# Map of destination -> (edu_type, key) -> Edu
|
||||||
self.pending_edus_keyed_by_dest = edus_keyed = {}
|
self.pending_edus_keyed_by_dest = edus_keyed = {}
|
||||||
|
|
||||||
metrics.register_callback(
|
metrics.register_callback(
|
||||||
@ -113,6 +125,8 @@ class TransactionQueue(object):
|
|||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
self._last_poked_id = -1
|
self._last_poked_id = -1
|
||||||
|
|
||||||
|
self._processing_pending_presence = False
|
||||||
|
|
||||||
def can_send_to(self, destination):
|
def can_send_to(self, destination):
|
||||||
"""Can we send messages to the given server?
|
"""Can we send messages to the given server?
|
||||||
|
|
||||||
@ -224,17 +238,71 @@ class TransactionQueue(object):
|
|||||||
self._attempt_new_transaction, destination
|
self._attempt_new_transaction, destination
|
||||||
)
|
)
|
||||||
|
|
||||||
def send_presence(self, destination, states):
|
@preserve_fn # the caller should not yield on this
|
||||||
if not self.can_send_to(destination):
|
@defer.inlineCallbacks
|
||||||
return
|
def send_presence(self, states):
|
||||||
|
"""Send the new presence states to the appropriate destinations.
|
||||||
|
|
||||||
self.pending_presence_by_dest.setdefault(destination, {}).update({
|
This actually queues up the presence states ready for sending and
|
||||||
|
triggers a background task to process them and send out the transactions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
"""
|
||||||
|
|
||||||
|
# First we queue up the new presence by user ID, so multiple presence
|
||||||
|
# updates in quick successtion are correctly handled
|
||||||
|
# We only want to send presence for our own users, so lets always just
|
||||||
|
# filter here just in case.
|
||||||
|
self.pending_presence.update({
|
||||||
state.user_id: state for state in states
|
state.user_id: state for state in states
|
||||||
|
if self.is_mine_id(state.user_id)
|
||||||
})
|
})
|
||||||
|
|
||||||
preserve_context_over_fn(
|
# We then handle the new pending presence in batches, first figuring
|
||||||
self._attempt_new_transaction, destination
|
# out the destinations we need to send each state to and then poking it
|
||||||
)
|
# to attempt a new transaction. We linearize this so that we don't
|
||||||
|
# accidentally mess up the ordering and send multiple presence updates
|
||||||
|
# in the wrong order
|
||||||
|
if self._processing_pending_presence:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._processing_pending_presence = True
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
states_map = self.pending_presence
|
||||||
|
self.pending_presence = {}
|
||||||
|
|
||||||
|
if not states_map:
|
||||||
|
break
|
||||||
|
|
||||||
|
yield self._process_presence_inner(states_map.values())
|
||||||
|
finally:
|
||||||
|
self._processing_pending_presence = False
|
||||||
|
|
||||||
|
@measure_func("txnqueue._process_presence")
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _process_presence_inner(self, states):
|
||||||
|
"""Given a list of states populate self.pending_presence_by_dest and
|
||||||
|
poke to send a new transaction to each destination
|
||||||
|
|
||||||
|
Args:
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
"""
|
||||||
|
hosts_and_states = yield get_interested_remotes(self.store, states)
|
||||||
|
|
||||||
|
for destinations, states in hosts_and_states:
|
||||||
|
for destination in destinations:
|
||||||
|
if not self.can_send_to(destination):
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.pending_presence_by_dest.setdefault(
|
||||||
|
destination, {}
|
||||||
|
).update({
|
||||||
|
state.user_id: state for state in states
|
||||||
|
})
|
||||||
|
|
||||||
|
preserve_fn(self._attempt_new_transaction)(destination)
|
||||||
|
|
||||||
def send_edu(self, destination, edu_type, content, key=None):
|
def send_edu(self, destination, edu_type, content, key=None):
|
||||||
edu = Edu(
|
edu = Edu(
|
||||||
@ -374,6 +442,7 @@ class TransactionQueue(object):
|
|||||||
destination, pending_pdus, pending_edus, pending_failures,
|
destination, pending_pdus, pending_edus, pending_failures,
|
||||||
)
|
)
|
||||||
if success:
|
if success:
|
||||||
|
sent_transactions_counter.inc()
|
||||||
# Remove the acknowledged device messages from the database
|
# Remove the acknowledged device messages from the database
|
||||||
# Only bother if we actually sent some device messages
|
# Only bother if we actually sent some device messages
|
||||||
if device_message_edus:
|
if device_message_edus:
|
||||||
|
@ -1097,15 +1097,15 @@ class FederationHandler(BaseHandler):
|
|||||||
user_id,
|
user_id,
|
||||||
"leave"
|
"leave"
|
||||||
)
|
)
|
||||||
signed_event = self._sign_event(event)
|
event = self._sign_event(event)
|
||||||
except SynapseError:
|
except SynapseError:
|
||||||
raise
|
raise
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
logger.warn("Failed to reject invite: %s", e)
|
logger.warn("Failed to reject invite: %s", e)
|
||||||
raise SynapseError(500, "Failed to reject invite")
|
raise SynapseError(500, "Failed to reject invite")
|
||||||
|
|
||||||
# Try the host we successfully got a response to /make_join/
|
# Try the host that we succesfully called /make_leave/ on first for
|
||||||
# request first.
|
# the /send_leave/ request.
|
||||||
try:
|
try:
|
||||||
target_hosts.remove(origin)
|
target_hosts.remove(origin)
|
||||||
target_hosts.insert(0, origin)
|
target_hosts.insert(0, origin)
|
||||||
@ -1115,7 +1115,7 @@ class FederationHandler(BaseHandler):
|
|||||||
try:
|
try:
|
||||||
yield self.replication_layer.send_leave(
|
yield self.replication_layer.send_leave(
|
||||||
target_hosts,
|
target_hosts,
|
||||||
signed_event
|
event
|
||||||
)
|
)
|
||||||
except SynapseError:
|
except SynapseError:
|
||||||
raise
|
raise
|
||||||
|
@ -318,11 +318,7 @@ class PresenceHandler(object):
|
|||||||
if to_federation_ping:
|
if to_federation_ping:
|
||||||
federation_presence_out_counter.inc_by(len(to_federation_ping))
|
federation_presence_out_counter.inc_by(len(to_federation_ping))
|
||||||
|
|
||||||
_, _, hosts_to_states = yield self._get_interested_parties(
|
self._push_to_remotes(to_federation_ping.values())
|
||||||
to_federation_ping.values()
|
|
||||||
)
|
|
||||||
|
|
||||||
self._push_to_remotes(hosts_to_states)
|
|
||||||
|
|
||||||
def _handle_timeouts(self):
|
def _handle_timeouts(self):
|
||||||
"""Checks the presence of users that have timed out and updates as
|
"""Checks the presence of users that have timed out and updates as
|
||||||
@ -617,54 +613,6 @@ class PresenceHandler(object):
|
|||||||
|
|
||||||
defer.returnValue(states)
|
defer.returnValue(states)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _get_interested_parties(self, states, calculate_remote_hosts=True):
|
|
||||||
"""Given a list of states return which entities (rooms, users, servers)
|
|
||||||
are interested in the given states.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
|
|
||||||
with each item being a dict of `entity_name` -> `[UserPresenceState]`
|
|
||||||
"""
|
|
||||||
room_ids_to_states = {}
|
|
||||||
users_to_states = {}
|
|
||||||
for state in states:
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(state.user_id)
|
|
||||||
for room_id in room_ids:
|
|
||||||
room_ids_to_states.setdefault(room_id, []).append(state)
|
|
||||||
|
|
||||||
plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
|
|
||||||
for u in plist:
|
|
||||||
users_to_states.setdefault(u, []).append(state)
|
|
||||||
|
|
||||||
# Always notify self
|
|
||||||
users_to_states.setdefault(state.user_id, []).append(state)
|
|
||||||
|
|
||||||
hosts_to_states = {}
|
|
||||||
if calculate_remote_hosts:
|
|
||||||
for room_id, states in room_ids_to_states.items():
|
|
||||||
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
|
|
||||||
if not local_states:
|
|
||||||
continue
|
|
||||||
|
|
||||||
hosts = yield self.store.get_hosts_in_room(room_id)
|
|
||||||
|
|
||||||
for host in hosts:
|
|
||||||
hosts_to_states.setdefault(host, []).extend(local_states)
|
|
||||||
|
|
||||||
for user_id, states in users_to_states.items():
|
|
||||||
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
|
|
||||||
if not local_states:
|
|
||||||
continue
|
|
||||||
|
|
||||||
host = get_domain_from_id(user_id)
|
|
||||||
hosts_to_states.setdefault(host, []).extend(local_states)
|
|
||||||
|
|
||||||
# TODO: de-dup hosts_to_states, as a single host might have multiple
|
|
||||||
# of same presence
|
|
||||||
|
|
||||||
defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _persist_and_notify(self, states):
|
def _persist_and_notify(self, states):
|
||||||
"""Persist states in the database, poke the notifier and send to
|
"""Persist states in the database, poke the notifier and send to
|
||||||
@ -672,34 +620,33 @@ class PresenceHandler(object):
|
|||||||
"""
|
"""
|
||||||
stream_id, max_token = yield self.store.update_presence(states)
|
stream_id, max_token = yield self.store.update_presence(states)
|
||||||
|
|
||||||
parties = yield self._get_interested_parties(states)
|
parties = yield get_interested_parties(self.store, states)
|
||||||
room_ids_to_states, users_to_states, hosts_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
|
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
|
||||||
users=[UserID.from_string(u) for u in users_to_states.keys()]
|
users=[UserID.from_string(u) for u in users_to_states]
|
||||||
)
|
)
|
||||||
|
|
||||||
self._push_to_remotes(hosts_to_states)
|
self._push_to_remotes(states)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_for_states(self, state, stream_id):
|
def notify_for_states(self, state, stream_id):
|
||||||
parties = yield self._get_interested_parties([state])
|
parties = yield get_interested_parties(self.store, [state])
|
||||||
room_ids_to_states, users_to_states, hosts_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
|
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
|
||||||
users=[UserID.from_string(u) for u in users_to_states.keys()]
|
users=[UserID.from_string(u) for u in users_to_states]
|
||||||
)
|
)
|
||||||
|
|
||||||
def _push_to_remotes(self, hosts_to_states):
|
def _push_to_remotes(self, states):
|
||||||
"""Sends state updates to remote servers.
|
"""Sends state updates to remote servers.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
|
states (list(UserPresenceState))
|
||||||
"""
|
"""
|
||||||
for host, states in hosts_to_states.items():
|
self.federation.send_presence(states)
|
||||||
self.federation.send_presence(host, states)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def incoming_presence(self, origin, content):
|
def incoming_presence(self, origin, content):
|
||||||
@ -840,14 +787,13 @@ class PresenceHandler(object):
|
|||||||
if self.is_mine(user):
|
if self.is_mine(user):
|
||||||
state = yield self.current_state_for_user(user.to_string())
|
state = yield self.current_state_for_user(user.to_string())
|
||||||
|
|
||||||
hosts = set(get_domain_from_id(u) for u in user_ids)
|
self._push_to_remotes([state])
|
||||||
self._push_to_remotes({host: (state,) for host in hosts})
|
|
||||||
else:
|
else:
|
||||||
user_ids = filter(self.is_mine_id, user_ids)
|
user_ids = filter(self.is_mine_id, user_ids)
|
||||||
|
|
||||||
states = yield self.current_state_for_users(user_ids)
|
states = yield self.current_state_for_users(user_ids)
|
||||||
|
|
||||||
self._push_to_remotes({user.domain: states.values()})
|
self._push_to_remotes(states.values())
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_presence_list(self, observer_user, accepted=None):
|
def get_presence_list(self, observer_user, accepted=None):
|
||||||
@ -1347,3 +1293,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
|||||||
persist_and_notify = True
|
persist_and_notify = True
|
||||||
|
|
||||||
return new_state, persist_and_notify, federation_ping
|
return new_state, persist_and_notify, federation_ping
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_interested_parties(store, states):
|
||||||
|
"""Given a list of states return which entities (rooms, users)
|
||||||
|
are interested in the given states.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
2-tuple: `(room_ids_to_states, users_to_states)`,
|
||||||
|
with each item being a dict of `entity_name` -> `[UserPresenceState]`
|
||||||
|
"""
|
||||||
|
room_ids_to_states = {}
|
||||||
|
users_to_states = {}
|
||||||
|
for state in states:
|
||||||
|
room_ids = yield store.get_rooms_for_user(state.user_id)
|
||||||
|
for room_id in room_ids:
|
||||||
|
room_ids_to_states.setdefault(room_id, []).append(state)
|
||||||
|
|
||||||
|
plist = yield store.get_presence_list_observers_accepted(state.user_id)
|
||||||
|
for u in plist:
|
||||||
|
users_to_states.setdefault(u, []).append(state)
|
||||||
|
|
||||||
|
# Always notify self
|
||||||
|
users_to_states.setdefault(state.user_id, []).append(state)
|
||||||
|
|
||||||
|
defer.returnValue((room_ids_to_states, users_to_states))
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_interested_remotes(store, states):
|
||||||
|
"""Given a list of presence states figure out which remote servers
|
||||||
|
should be sent which.
|
||||||
|
|
||||||
|
All the presence states should be for local users only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
store (DataStore)
|
||||||
|
states (list(UserPresenceState))
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred list of ([destinations], [UserPresenceState]), where for
|
||||||
|
each row the list of UserPresenceState should be sent to each
|
||||||
|
destination
|
||||||
|
"""
|
||||||
|
hosts_and_states = []
|
||||||
|
|
||||||
|
# First we look up the rooms each user is in (as well as any explicit
|
||||||
|
# subscriptions), then for each distinct room we look up the remote
|
||||||
|
# hosts in those rooms.
|
||||||
|
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
|
||||||
|
|
||||||
|
for room_id, states in room_ids_to_states.iteritems():
|
||||||
|
hosts = yield store.get_hosts_in_room(room_id)
|
||||||
|
hosts_and_states.append((hosts, states))
|
||||||
|
|
||||||
|
for user_id, states in users_to_states.iteritems():
|
||||||
|
host = get_domain_from_id(user_id)
|
||||||
|
hosts_and_states.append(([host], states))
|
||||||
|
|
||||||
|
defer.returnValue(hosts_and_states)
|
||||||
|
@ -1,60 +0,0 @@
|
|||||||
# Copyright 2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from synapse.http.server import respond_with_json_bytes, request_handler
|
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
|
||||||
|
|
||||||
from twisted.web.resource import Resource
|
|
||||||
from twisted.web.server import NOT_DONE_YET
|
|
||||||
|
|
||||||
|
|
||||||
class ExpireCacheResource(Resource):
|
|
||||||
"""
|
|
||||||
HTTP endpoint for expiring storage caches.
|
|
||||||
|
|
||||||
POST /_synapse/replication/expire_cache HTTP/1.1
|
|
||||||
Content-Type: application/json
|
|
||||||
|
|
||||||
{
|
|
||||||
"invalidate": [
|
|
||||||
{
|
|
||||||
"name": "func_name",
|
|
||||||
"keys": ["key1", "key2"]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
Resource.__init__(self) # Resource is old-style, so no super()
|
|
||||||
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.version_string = hs.version_string
|
|
||||||
self.clock = hs.get_clock()
|
|
||||||
|
|
||||||
def render_POST(self, request):
|
|
||||||
self._async_render_POST(request)
|
|
||||||
return NOT_DONE_YET
|
|
||||||
|
|
||||||
@request_handler()
|
|
||||||
def _async_render_POST(self, request):
|
|
||||||
content = parse_json_object_from_request(request)
|
|
||||||
|
|
||||||
for row in content["invalidate"]:
|
|
||||||
name = row["name"]
|
|
||||||
keys = tuple(row["keys"])
|
|
||||||
|
|
||||||
getattr(self.store, name).invalidate(keys)
|
|
||||||
|
|
||||||
respond_with_json_bytes(request, 200, "{}")
|
|
@ -1,59 +0,0 @@
|
|||||||
# Copyright 2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from synapse.http.server import respond_with_json_bytes, request_handler
|
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
|
||||||
|
|
||||||
from twisted.web.resource import Resource
|
|
||||||
from twisted.web.server import NOT_DONE_YET
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
|
|
||||||
class PresenceResource(Resource):
|
|
||||||
"""
|
|
||||||
HTTP endpoint for marking users as syncing.
|
|
||||||
|
|
||||||
POST /_synapse/replication/presence HTTP/1.1
|
|
||||||
Content-Type: application/json
|
|
||||||
|
|
||||||
{
|
|
||||||
"process_id": "<process_id>",
|
|
||||||
"syncing_users": ["<user_id>"]
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
Resource.__init__(self) # Resource is old-style, so no super()
|
|
||||||
|
|
||||||
self.version_string = hs.version_string
|
|
||||||
self.clock = hs.get_clock()
|
|
||||||
self.presence_handler = hs.get_presence_handler()
|
|
||||||
|
|
||||||
def render_POST(self, request):
|
|
||||||
self._async_render_POST(request)
|
|
||||||
return NOT_DONE_YET
|
|
||||||
|
|
||||||
@request_handler()
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _async_render_POST(self, request):
|
|
||||||
content = parse_json_object_from_request(request)
|
|
||||||
|
|
||||||
process_id = content["process_id"]
|
|
||||||
syncing_user_ids = content["syncing_users"]
|
|
||||||
|
|
||||||
yield self.presence_handler.update_external_syncs(
|
|
||||||
process_id, set(syncing_user_ids)
|
|
||||||
)
|
|
||||||
|
|
||||||
respond_with_json_bytes(request, 200, "{}")
|
|
@ -1,54 +0,0 @@
|
|||||||
# Copyright 2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from synapse.http.server import respond_with_json_bytes, request_handler
|
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
|
||||||
|
|
||||||
from twisted.web.resource import Resource
|
|
||||||
from twisted.web.server import NOT_DONE_YET
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
|
|
||||||
class PusherResource(Resource):
|
|
||||||
"""
|
|
||||||
HTTP endpoint for deleting rejected pushers
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
Resource.__init__(self) # Resource is old-style, so no super()
|
|
||||||
|
|
||||||
self.version_string = hs.version_string
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.notifier = hs.get_notifier()
|
|
||||||
self.clock = hs.get_clock()
|
|
||||||
|
|
||||||
def render_POST(self, request):
|
|
||||||
self._async_render_POST(request)
|
|
||||||
return NOT_DONE_YET
|
|
||||||
|
|
||||||
@request_handler()
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _async_render_POST(self, request):
|
|
||||||
content = parse_json_object_from_request(request)
|
|
||||||
|
|
||||||
for remove in content["remove"]:
|
|
||||||
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
|
||||||
remove["app_id"],
|
|
||||||
remove["push_key"],
|
|
||||||
remove["user_id"],
|
|
||||||
)
|
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
|
||||||
|
|
||||||
respond_with_json_bytes(request, 200, "{}")
|
|
@ -1,576 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Copyright 2015 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from synapse.http.servlet import parse_integer, parse_string
|
|
||||||
from synapse.http.server import request_handler, finish_request
|
|
||||||
from synapse.replication.pusher_resource import PusherResource
|
|
||||||
from synapse.replication.presence_resource import PresenceResource
|
|
||||||
from synapse.replication.expire_cache import ExpireCacheResource
|
|
||||||
from synapse.api.errors import SynapseError
|
|
||||||
|
|
||||||
from twisted.web.resource import Resource
|
|
||||||
from twisted.web.server import NOT_DONE_YET
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
import ujson as json
|
|
||||||
|
|
||||||
import collections
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
REPLICATION_PREFIX = "/_synapse/replication"
|
|
||||||
|
|
||||||
STREAM_NAMES = (
|
|
||||||
("events",),
|
|
||||||
("presence",),
|
|
||||||
("typing",),
|
|
||||||
("receipts",),
|
|
||||||
("user_account_data", "room_account_data", "tag_account_data",),
|
|
||||||
("backfill",),
|
|
||||||
("push_rules",),
|
|
||||||
("pushers",),
|
|
||||||
("caches",),
|
|
||||||
("to_device",),
|
|
||||||
("public_rooms",),
|
|
||||||
("federation",),
|
|
||||||
("device_lists",),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ReplicationResource(Resource):
|
|
||||||
"""
|
|
||||||
HTTP endpoint for extracting data from synapse.
|
|
||||||
|
|
||||||
The streams of data returned by the endpoint are controlled by the
|
|
||||||
parameters given to the API. To return a given stream pass a query
|
|
||||||
parameter with a position in the stream to return data from or the
|
|
||||||
special value "-1" to return data from the start of the stream.
|
|
||||||
|
|
||||||
If there is no data for any of the supplied streams after the given
|
|
||||||
position then the request will block until there is data for one
|
|
||||||
of the streams. This allows clients to long-poll this API.
|
|
||||||
|
|
||||||
The possible streams are:
|
|
||||||
|
|
||||||
* "streams": A special stream returing the positions of other streams.
|
|
||||||
* "events": The new events seen on the server.
|
|
||||||
* "presence": Presence updates.
|
|
||||||
* "typing": Typing updates.
|
|
||||||
* "receipts": Receipt updates.
|
|
||||||
* "user_account_data": Top-level per user account data.
|
|
||||||
* "room_account_data: Per room per user account data.
|
|
||||||
* "tag_account_data": Per room per user tags.
|
|
||||||
* "backfill": Old events that have been backfilled from other servers.
|
|
||||||
* "push_rules": Per user changes to push rules.
|
|
||||||
* "pushers": Per user changes to their pushers.
|
|
||||||
* "caches": Cache invalidations.
|
|
||||||
|
|
||||||
The API takes two additional query parameters:
|
|
||||||
|
|
||||||
* "timeout": How long to wait before returning an empty response.
|
|
||||||
* "limit": The maximum number of rows to return for the selected streams.
|
|
||||||
|
|
||||||
The response is a JSON object with keys for each stream with updates. Under
|
|
||||||
each key is a JSON object with:
|
|
||||||
|
|
||||||
* "position": The current position of the stream.
|
|
||||||
* "field_names": The names of the fields in each row.
|
|
||||||
* "rows": The updates as an array of arrays.
|
|
||||||
|
|
||||||
There are a number of ways this API could be used:
|
|
||||||
|
|
||||||
1) To replicate the contents of the backing database to another database.
|
|
||||||
2) To be notified when the contents of a shared backing database changes.
|
|
||||||
3) To "tail" the activity happening on a server for debugging.
|
|
||||||
|
|
||||||
In the first case the client would track all of the streams and store it's
|
|
||||||
own copy of the data.
|
|
||||||
|
|
||||||
In the second case the client might theoretically just be able to follow
|
|
||||||
the "streams" stream to track where the other streams are. However in
|
|
||||||
practise it will probably need to get the contents of the streams in
|
|
||||||
order to expire the any in-memory caches. Whether it gets the contents
|
|
||||||
of the streams from this replication API or directly from the backing
|
|
||||||
store is a matter of taste.
|
|
||||||
|
|
||||||
In the third case the client would use the "streams" stream to find what
|
|
||||||
streams are available and their current positions. Then it can start
|
|
||||||
long-polling this replication API for new data on those streams.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
Resource.__init__(self) # Resource is old-style, so no super()
|
|
||||||
|
|
||||||
self.version_string = hs.version_string
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.sources = hs.get_event_sources()
|
|
||||||
self.presence_handler = hs.get_presence_handler()
|
|
||||||
self.typing_handler = hs.get_typing_handler()
|
|
||||||
self.federation_sender = hs.get_federation_sender()
|
|
||||||
self.notifier = hs.notifier
|
|
||||||
self.clock = hs.get_clock()
|
|
||||||
self.config = hs.get_config()
|
|
||||||
|
|
||||||
self.putChild("remove_pushers", PusherResource(hs))
|
|
||||||
self.putChild("syncing_users", PresenceResource(hs))
|
|
||||||
self.putChild("expire_cache", ExpireCacheResource(hs))
|
|
||||||
|
|
||||||
def render_GET(self, request):
|
|
||||||
self._async_render_GET(request)
|
|
||||||
return NOT_DONE_YET
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def current_replication_token(self):
|
|
||||||
stream_token = yield self.sources.get_current_token()
|
|
||||||
backfill_token = yield self.store.get_current_backfill_token()
|
|
||||||
push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
|
|
||||||
pushers_token = self.store.get_pushers_stream_token()
|
|
||||||
caches_token = self.store.get_cache_stream_token()
|
|
||||||
public_rooms_token = self.store.get_current_public_room_stream_id()
|
|
||||||
federation_token = self.federation_sender.get_current_token()
|
|
||||||
device_list_token = self.store.get_device_stream_token()
|
|
||||||
|
|
||||||
defer.returnValue(_ReplicationToken(
|
|
||||||
room_stream_token,
|
|
||||||
int(stream_token.presence_key),
|
|
||||||
int(stream_token.typing_key),
|
|
||||||
int(stream_token.receipt_key),
|
|
||||||
int(stream_token.account_data_key),
|
|
||||||
backfill_token,
|
|
||||||
push_rules_token,
|
|
||||||
pushers_token,
|
|
||||||
0, # State stream is no longer a thing
|
|
||||||
caches_token,
|
|
||||||
int(stream_token.to_device_key),
|
|
||||||
int(public_rooms_token),
|
|
||||||
int(federation_token),
|
|
||||||
int(device_list_token),
|
|
||||||
))
|
|
||||||
|
|
||||||
@request_handler()
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _async_render_GET(self, request):
|
|
||||||
limit = parse_integer(request, "limit", 100)
|
|
||||||
timeout = parse_integer(request, "timeout", 10 * 1000)
|
|
||||||
|
|
||||||
request.setHeader(b"Content-Type", b"application/json")
|
|
||||||
|
|
||||||
request_streams = {
|
|
||||||
name: parse_integer(request, name)
|
|
||||||
for names in STREAM_NAMES for name in names
|
|
||||||
}
|
|
||||||
request_streams["streams"] = parse_string(request, "streams")
|
|
||||||
|
|
||||||
federation_ack = parse_integer(request, "federation_ack", None)
|
|
||||||
|
|
||||||
def replicate():
|
|
||||||
return self.replicate(
|
|
||||||
request_streams, limit,
|
|
||||||
federation_ack=federation_ack
|
|
||||||
)
|
|
||||||
|
|
||||||
writer = yield self.notifier.wait_for_replication(replicate, timeout)
|
|
||||||
result = writer.finish()
|
|
||||||
|
|
||||||
for stream_name, stream_content in result.items():
|
|
||||||
logger.info(
|
|
||||||
"Replicating %d rows of %s from %s -> %s",
|
|
||||||
len(stream_content["rows"]),
|
|
||||||
stream_name,
|
|
||||||
request_streams.get(stream_name),
|
|
||||||
stream_content["position"],
|
|
||||||
)
|
|
||||||
|
|
||||||
request.write(json.dumps(result, ensure_ascii=False))
|
|
||||||
finish_request(request)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def replicate(self, request_streams, limit, federation_ack=None):
|
|
||||||
writer = _Writer()
|
|
||||||
current_token = yield self.current_replication_token()
|
|
||||||
logger.debug("Replicating up to %r", current_token)
|
|
||||||
|
|
||||||
if limit == 0:
|
|
||||||
raise SynapseError(400, "Limit cannot be 0")
|
|
||||||
|
|
||||||
yield self.account_data(writer, current_token, limit, request_streams)
|
|
||||||
yield self.events(writer, current_token, limit, request_streams)
|
|
||||||
# TODO: implement limit
|
|
||||||
yield self.presence(writer, current_token, request_streams)
|
|
||||||
yield self.typing(writer, current_token, request_streams)
|
|
||||||
yield self.receipts(writer, current_token, limit, request_streams)
|
|
||||||
yield self.push_rules(writer, current_token, limit, request_streams)
|
|
||||||
yield self.pushers(writer, current_token, limit, request_streams)
|
|
||||||
yield self.caches(writer, current_token, limit, request_streams)
|
|
||||||
yield self.to_device(writer, current_token, limit, request_streams)
|
|
||||||
yield self.public_rooms(writer, current_token, limit, request_streams)
|
|
||||||
yield self.device_lists(writer, current_token, limit, request_streams)
|
|
||||||
self.federation(writer, current_token, limit, request_streams, federation_ack)
|
|
||||||
self.streams(writer, current_token, request_streams)
|
|
||||||
|
|
||||||
logger.debug("Replicated %d rows", writer.total)
|
|
||||||
defer.returnValue(writer)
|
|
||||||
|
|
||||||
def streams(self, writer, current_token, request_streams):
|
|
||||||
request_token = request_streams.get("streams")
|
|
||||||
|
|
||||||
streams = []
|
|
||||||
|
|
||||||
if request_token is not None:
|
|
||||||
if request_token == "-1":
|
|
||||||
for names, position in zip(STREAM_NAMES, current_token):
|
|
||||||
streams.extend((name, position) for name in names)
|
|
||||||
else:
|
|
||||||
items = zip(
|
|
||||||
STREAM_NAMES,
|
|
||||||
current_token,
|
|
||||||
_ReplicationToken(request_token)
|
|
||||||
)
|
|
||||||
for names, current_id, last_id in items:
|
|
||||||
if last_id < current_id:
|
|
||||||
streams.extend((name, current_id) for name in names)
|
|
||||||
|
|
||||||
if streams:
|
|
||||||
writer.write_header_and_rows(
|
|
||||||
"streams", streams, ("name", "position"),
|
|
||||||
position=str(current_token)
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def events(self, writer, current_token, limit, request_streams):
|
|
||||||
request_events = request_streams.get("events")
|
|
||||||
request_backfill = request_streams.get("backfill")
|
|
||||||
|
|
||||||
if request_events is not None or request_backfill is not None:
|
|
||||||
if request_events is None:
|
|
||||||
request_events = current_token.events
|
|
||||||
if request_backfill is None:
|
|
||||||
request_backfill = current_token.backfill
|
|
||||||
|
|
||||||
no_new_tokens = (
|
|
||||||
request_events == current_token.events
|
|
||||||
and request_backfill == current_token.backfill
|
|
||||||
)
|
|
||||||
if no_new_tokens:
|
|
||||||
return
|
|
||||||
|
|
||||||
res = yield self.store.get_all_new_events(
|
|
||||||
request_backfill, request_events,
|
|
||||||
current_token.backfill, current_token.events,
|
|
||||||
limit
|
|
||||||
)
|
|
||||||
|
|
||||||
upto_events_token = _position_from_rows(
|
|
||||||
res.new_forward_events, current_token.events
|
|
||||||
)
|
|
||||||
|
|
||||||
upto_backfill_token = _position_from_rows(
|
|
||||||
res.new_backfill_events, current_token.backfill
|
|
||||||
)
|
|
||||||
|
|
||||||
if request_events != upto_events_token:
|
|
||||||
writer.write_header_and_rows("events", res.new_forward_events, (
|
|
||||||
"position", "event_id", "room_id", "type", "state_key",
|
|
||||||
), position=upto_events_token)
|
|
||||||
|
|
||||||
if request_backfill != upto_backfill_token:
|
|
||||||
writer.write_header_and_rows("backfill", res.new_backfill_events, (
|
|
||||||
"position", "event_id", "room_id", "type", "state_key", "redacts",
|
|
||||||
), position=upto_backfill_token)
|
|
||||||
|
|
||||||
writer.write_header_and_rows(
|
|
||||||
"forward_ex_outliers", res.forward_ex_outliers,
|
|
||||||
("position", "event_id", "state_group"),
|
|
||||||
)
|
|
||||||
writer.write_header_and_rows(
|
|
||||||
"backward_ex_outliers", res.backward_ex_outliers,
|
|
||||||
("position", "event_id", "state_group"),
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def presence(self, writer, current_token, request_streams):
|
|
||||||
current_position = current_token.presence
|
|
||||||
|
|
||||||
request_presence = request_streams.get("presence")
|
|
||||||
|
|
||||||
if request_presence is not None and request_presence != current_position:
|
|
||||||
presence_rows = yield self.presence_handler.get_all_presence_updates(
|
|
||||||
request_presence, current_position
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(presence_rows, current_position)
|
|
||||||
writer.write_header_and_rows("presence", presence_rows, (
|
|
||||||
"position", "user_id", "state", "last_active_ts",
|
|
||||||
"last_federation_update_ts", "last_user_sync_ts",
|
|
||||||
"status_msg", "currently_active",
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def typing(self, writer, current_token, request_streams):
|
|
||||||
current_position = current_token.typing
|
|
||||||
|
|
||||||
request_typing = request_streams.get("typing")
|
|
||||||
|
|
||||||
if request_typing is not None and request_typing != current_position:
|
|
||||||
# If they have a higher token than current max, we can assume that
|
|
||||||
# they had been talking to a previous instance of the master. Since
|
|
||||||
# we reset the token on restart, the best (but hacky) thing we can
|
|
||||||
# do is to simply resend down all the typing notifications.
|
|
||||||
if request_typing > current_position:
|
|
||||||
request_typing = 0
|
|
||||||
|
|
||||||
typing_rows = yield self.typing_handler.get_all_typing_updates(
|
|
||||||
request_typing, current_position
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(typing_rows, current_position)
|
|
||||||
writer.write_header_and_rows("typing", typing_rows, (
|
|
||||||
"position", "room_id", "typing"
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def receipts(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.receipts
|
|
||||||
|
|
||||||
request_receipts = request_streams.get("receipts")
|
|
||||||
|
|
||||||
if request_receipts is not None and request_receipts != current_position:
|
|
||||||
receipts_rows = yield self.store.get_all_updated_receipts(
|
|
||||||
request_receipts, current_position, limit
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(receipts_rows, current_position)
|
|
||||||
writer.write_header_and_rows("receipts", receipts_rows, (
|
|
||||||
"position", "room_id", "receipt_type", "user_id", "event_id", "data"
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def account_data(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.account_data
|
|
||||||
|
|
||||||
user_account_data = request_streams.get("user_account_data")
|
|
||||||
room_account_data = request_streams.get("room_account_data")
|
|
||||||
tag_account_data = request_streams.get("tag_account_data")
|
|
||||||
|
|
||||||
if user_account_data is not None or room_account_data is not None:
|
|
||||||
if user_account_data is None:
|
|
||||||
user_account_data = current_position
|
|
||||||
if room_account_data is None:
|
|
||||||
room_account_data = current_position
|
|
||||||
|
|
||||||
no_new_tokens = (
|
|
||||||
user_account_data == current_position
|
|
||||||
and room_account_data == current_position
|
|
||||||
)
|
|
||||||
if no_new_tokens:
|
|
||||||
return
|
|
||||||
|
|
||||||
user_rows, room_rows = yield self.store.get_all_updated_account_data(
|
|
||||||
user_account_data, room_account_data, current_position, limit
|
|
||||||
)
|
|
||||||
|
|
||||||
upto_users_token = _position_from_rows(user_rows, current_position)
|
|
||||||
upto_rooms_token = _position_from_rows(room_rows, current_position)
|
|
||||||
|
|
||||||
writer.write_header_and_rows("user_account_data", user_rows, (
|
|
||||||
"position", "user_id", "type", "content"
|
|
||||||
), position=upto_users_token)
|
|
||||||
writer.write_header_and_rows("room_account_data", room_rows, (
|
|
||||||
"position", "user_id", "room_id", "type", "content"
|
|
||||||
), position=upto_rooms_token)
|
|
||||||
|
|
||||||
if tag_account_data is not None:
|
|
||||||
tag_rows = yield self.store.get_all_updated_tags(
|
|
||||||
tag_account_data, current_position, limit
|
|
||||||
)
|
|
||||||
upto_tag_token = _position_from_rows(tag_rows, current_position)
|
|
||||||
writer.write_header_and_rows("tag_account_data", tag_rows, (
|
|
||||||
"position", "user_id", "room_id", "tags"
|
|
||||||
), position=upto_tag_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def push_rules(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.push_rules
|
|
||||||
|
|
||||||
push_rules = request_streams.get("push_rules")
|
|
||||||
|
|
||||||
if push_rules is not None and push_rules != current_position:
|
|
||||||
rows = yield self.store.get_all_push_rule_updates(
|
|
||||||
push_rules, current_position, limit
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(rows, current_position)
|
|
||||||
writer.write_header_and_rows("push_rules", rows, (
|
|
||||||
"position", "event_stream_ordering", "user_id", "rule_id", "op",
|
|
||||||
"priority_class", "priority", "conditions", "actions"
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def pushers(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.pushers
|
|
||||||
|
|
||||||
pushers = request_streams.get("pushers")
|
|
||||||
|
|
||||||
if pushers is not None and pushers != current_position:
|
|
||||||
updated, deleted = yield self.store.get_all_updated_pushers(
|
|
||||||
pushers, current_position, limit
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(updated, current_position)
|
|
||||||
writer.write_header_and_rows("pushers", updated, (
|
|
||||||
"position", "user_id", "access_token", "profile_tag", "kind",
|
|
||||||
"app_id", "app_display_name", "device_display_name", "pushkey",
|
|
||||||
"ts", "lang", "data"
|
|
||||||
), position=upto_token)
|
|
||||||
writer.write_header_and_rows("deleted_pushers", deleted, (
|
|
||||||
"position", "user_id", "app_id", "pushkey"
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def caches(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.caches
|
|
||||||
|
|
||||||
caches = request_streams.get("caches")
|
|
||||||
|
|
||||||
if caches is not None and caches != current_position:
|
|
||||||
updated_caches = yield self.store.get_all_updated_caches(
|
|
||||||
caches, current_position, limit
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(updated_caches, current_position)
|
|
||||||
writer.write_header_and_rows("caches", updated_caches, (
|
|
||||||
"position", "cache_func", "keys", "invalidation_ts"
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def to_device(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.to_device
|
|
||||||
|
|
||||||
to_device = request_streams.get("to_device")
|
|
||||||
|
|
||||||
if to_device is not None and to_device != current_position:
|
|
||||||
to_device_rows = yield self.store.get_all_new_device_messages(
|
|
||||||
to_device, current_position, limit
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(to_device_rows, current_position)
|
|
||||||
writer.write_header_and_rows("to_device", to_device_rows, (
|
|
||||||
"position", "user_id", "device_id", "message_json"
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def public_rooms(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.public_rooms
|
|
||||||
|
|
||||||
public_rooms = request_streams.get("public_rooms")
|
|
||||||
|
|
||||||
if public_rooms is not None and public_rooms != current_position:
|
|
||||||
public_rooms_rows = yield self.store.get_all_new_public_rooms(
|
|
||||||
public_rooms, current_position, limit
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(public_rooms_rows, current_position)
|
|
||||||
writer.write_header_and_rows("public_rooms", public_rooms_rows, (
|
|
||||||
"position", "room_id", "visibility", "appservice_id", "network_id",
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
def federation(self, writer, current_token, limit, request_streams, federation_ack):
|
|
||||||
if self.config.send_federation:
|
|
||||||
return
|
|
||||||
|
|
||||||
current_position = current_token.federation
|
|
||||||
|
|
||||||
federation = request_streams.get("federation")
|
|
||||||
|
|
||||||
if federation is not None and federation != current_position:
|
|
||||||
federation_rows = self.federation_sender.get_replication_rows(
|
|
||||||
federation, current_position, limit, federation_ack=federation_ack,
|
|
||||||
)
|
|
||||||
upto_token = _position_from_rows(federation_rows, current_position)
|
|
||||||
writer.write_header_and_rows("federation", federation_rows, (
|
|
||||||
"position", "type", "content",
|
|
||||||
), position=upto_token)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def device_lists(self, writer, current_token, limit, request_streams):
|
|
||||||
current_position = current_token.device_lists
|
|
||||||
|
|
||||||
device_lists = request_streams.get("device_lists")
|
|
||||||
|
|
||||||
if device_lists is not None and device_lists != current_position:
|
|
||||||
changes = yield self.store.get_all_device_list_changes_for_remotes(
|
|
||||||
device_lists, current_position,
|
|
||||||
)
|
|
||||||
writer.write_header_and_rows("device_lists", changes, (
|
|
||||||
"position", "user_id", "destination",
|
|
||||||
), position=current_position)
|
|
||||||
|
|
||||||
|
|
||||||
class _Writer(object):
|
|
||||||
"""Writes the streams as a JSON object as the response to the request"""
|
|
||||||
def __init__(self):
|
|
||||||
self.streams = {}
|
|
||||||
self.total = 0
|
|
||||||
|
|
||||||
def write_header_and_rows(self, name, rows, fields, position=None):
|
|
||||||
if position is None:
|
|
||||||
if rows:
|
|
||||||
position = rows[-1][0]
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.streams[name] = {
|
|
||||||
"position": position if type(position) is int else str(position),
|
|
||||||
"field_names": fields,
|
|
||||||
"rows": rows,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.total += len(rows)
|
|
||||||
|
|
||||||
def __nonzero__(self):
|
|
||||||
return bool(self.total)
|
|
||||||
|
|
||||||
def finish(self):
|
|
||||||
return self.streams
|
|
||||||
|
|
||||||
|
|
||||||
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
|
|
||||||
"events", "presence", "typing", "receipts", "account_data", "backfill",
|
|
||||||
"push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
|
|
||||||
"federation", "device_lists",
|
|
||||||
))):
|
|
||||||
__slots__ = []
|
|
||||||
|
|
||||||
def __new__(cls, *args):
|
|
||||||
if len(args) == 1:
|
|
||||||
streams = [int(value) for value in args[0].split("_")]
|
|
||||||
if len(streams) < len(cls._fields):
|
|
||||||
streams.extend([0] * (len(cls._fields) - len(streams)))
|
|
||||||
return cls(*streams)
|
|
||||||
else:
|
|
||||||
return super(_ReplicationToken, cls).__new__(cls, *args)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return "_".join(str(value) for value in self)
|
|
||||||
|
|
||||||
|
|
||||||
def _position_from_rows(rows, current_position):
|
|
||||||
"""Calculates a position to return for a stream. Ideally we want to return the
|
|
||||||
position of the last row, as that will be the most correct. However, if there
|
|
||||||
are no rows we fall back to using the current position to stop us from
|
|
||||||
repeatedly hitting the storage layer unncessarily thinking there are updates.
|
|
||||||
(Not all advances of the token correspond to an actual update)
|
|
||||||
|
|
||||||
We can't just always return the current position, as we often limit the
|
|
||||||
number of rows we replicate, and so the stream may lag. The assumption is
|
|
||||||
that if the storage layer returns no new rows then we are not lagging and
|
|
||||||
we are at the `current_position`.
|
|
||||||
"""
|
|
||||||
if rows:
|
|
||||||
return rows[-1][0]
|
|
||||||
return current_position
|
|
@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore):
|
|||||||
# to reach inside the __dict__ to extract them.
|
# to reach inside the __dict__ to extract them.
|
||||||
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
|
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
|
||||||
get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
|
get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
|
||||||
|
get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
|
||||||
get_users_who_share_room_with_user = (
|
get_users_who_share_room_with_user = (
|
||||||
RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
|
RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
|
||||||
)
|
)
|
||||||
|
@ -39,6 +39,16 @@ class SlavedPresenceStore(BaseSlavedStore):
|
|||||||
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
||||||
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
||||||
|
|
||||||
|
# XXX: This is a bit broken because we don't persist the accepted list in a
|
||||||
|
# way that can be replicated. This means that we don't have a way to
|
||||||
|
# invalidate the cache correctly.
|
||||||
|
get_presence_list_accepted = PresenceStore.__dict__[
|
||||||
|
"get_presence_list_accepted"
|
||||||
|
]
|
||||||
|
get_presence_list_observers_accepted = PresenceStore.__dict__[
|
||||||
|
"get_presence_list_observers_accepted"
|
||||||
|
]
|
||||||
|
|
||||||
def get_current_presence_token(self):
|
def get_current_presence_token(self):
|
||||||
return self._presence_id_gen.get_current_token()
|
return self._presence_id_gen.get_current_token()
|
||||||
|
|
||||||
|
@ -120,6 +120,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
where_clauses.append("(user_id = ? AND device_id = ?)")
|
where_clauses.append("(user_id = ? AND device_id = ?)")
|
||||||
bindings.extend((user_id, device_id))
|
bindings.extend((user_id, device_id))
|
||||||
|
|
||||||
|
if not where_clauses:
|
||||||
|
return []
|
||||||
|
|
||||||
inner_select = (
|
inner_select = (
|
||||||
"SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
|
"SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
|
||||||
"WHERE %(where)s "
|
"WHERE %(where)s "
|
||||||
|
@ -1,204 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Copyright 2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
import contextlib
|
|
||||||
import json
|
|
||||||
|
|
||||||
from mock import Mock, NonCallableMock
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
import synapse.types
|
|
||||||
from synapse.replication.resource import ReplicationResource
|
|
||||||
from synapse.types import UserID
|
|
||||||
from tests import unittest
|
|
||||||
from tests.utils import setup_test_homeserver
|
|
||||||
|
|
||||||
|
|
||||||
class ReplicationResourceCase(unittest.TestCase):
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def setUp(self):
|
|
||||||
self.hs = yield setup_test_homeserver(
|
|
||||||
"red",
|
|
||||||
http_client=None,
|
|
||||||
replication_layer=Mock(),
|
|
||||||
ratelimiter=NonCallableMock(spec_set=[
|
|
||||||
"send_message",
|
|
||||||
]),
|
|
||||||
)
|
|
||||||
self.user_id = "@seeing:red"
|
|
||||||
self.user = UserID.from_string(self.user_id)
|
|
||||||
|
|
||||||
self.hs.get_ratelimiter().send_message.return_value = (True, 0)
|
|
||||||
|
|
||||||
self.resource = ReplicationResource(self.hs)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_streams(self):
|
|
||||||
# Passing "-1" returns the current stream positions
|
|
||||||
code, body = yield self.get(streams="-1")
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body["streams"]["field_names"], ["name", "position"])
|
|
||||||
position = body["streams"]["position"]
|
|
||||||
# Passing the current position returns an empty response after the
|
|
||||||
# timeout
|
|
||||||
get = self.get(streams=str(position), timeout="0")
|
|
||||||
self.hs.clock.advance_time_msec(1)
|
|
||||||
code, body = yield get
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body, {})
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_events(self):
|
|
||||||
get = self.get(events="-1", timeout="0")
|
|
||||||
yield self.hs.get_handlers().room_creation_handler.create_room(
|
|
||||||
synapse.types.create_requester(self.user), {}
|
|
||||||
)
|
|
||||||
code, body = yield get
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body["events"]["field_names"], [
|
|
||||||
"position", "event_id", "room_id", "type", "state_key",
|
|
||||||
])
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_presence(self):
|
|
||||||
get = self.get(presence="-1")
|
|
||||||
yield self.hs.get_presence_handler().set_state(
|
|
||||||
self.user, {"presence": "online"}
|
|
||||||
)
|
|
||||||
code, body = yield get
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body["presence"]["field_names"], [
|
|
||||||
"position", "user_id", "state", "last_active_ts",
|
|
||||||
"last_federation_update_ts", "last_user_sync_ts",
|
|
||||||
"status_msg", "currently_active",
|
|
||||||
])
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_typing(self):
|
|
||||||
room_id = yield self.create_room()
|
|
||||||
get = self.get(typing="-1")
|
|
||||||
yield self.hs.get_typing_handler().started_typing(
|
|
||||||
self.user, self.user, room_id, timeout=2
|
|
||||||
)
|
|
||||||
code, body = yield get
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body["typing"]["field_names"], [
|
|
||||||
"position", "room_id", "typing"
|
|
||||||
])
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_receipts(self):
|
|
||||||
room_id = yield self.create_room()
|
|
||||||
event_id = yield self.send_text_message(room_id, "Hello, World")
|
|
||||||
get = self.get(receipts="-1")
|
|
||||||
yield self.hs.get_receipts_handler().received_client_receipt(
|
|
||||||
room_id, "m.read", self.user_id, event_id
|
|
||||||
)
|
|
||||||
code, body = yield get
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body["receipts"]["field_names"], [
|
|
||||||
"position", "room_id", "receipt_type", "user_id", "event_id", "data"
|
|
||||||
])
|
|
||||||
|
|
||||||
def _test_timeout(stream):
|
|
||||||
"""Check that a request for the given stream timesout"""
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_timeout(self):
|
|
||||||
get = self.get(**{stream: "-1", "timeout": "0"})
|
|
||||||
self.hs.clock.advance_time_msec(1)
|
|
||||||
code, body = yield get
|
|
||||||
self.assertEquals(code, 200)
|
|
||||||
self.assertEquals(body.get("rows", []), [])
|
|
||||||
test_timeout.__name__ = "test_timeout_%s" % (stream)
|
|
||||||
return test_timeout
|
|
||||||
|
|
||||||
test_timeout_events = _test_timeout("events")
|
|
||||||
test_timeout_presence = _test_timeout("presence")
|
|
||||||
test_timeout_typing = _test_timeout("typing")
|
|
||||||
test_timeout_receipts = _test_timeout("receipts")
|
|
||||||
test_timeout_user_account_data = _test_timeout("user_account_data")
|
|
||||||
test_timeout_room_account_data = _test_timeout("room_account_data")
|
|
||||||
test_timeout_tag_account_data = _test_timeout("tag_account_data")
|
|
||||||
test_timeout_backfill = _test_timeout("backfill")
|
|
||||||
test_timeout_push_rules = _test_timeout("push_rules")
|
|
||||||
test_timeout_pushers = _test_timeout("pushers")
|
|
||||||
test_timeout_state = _test_timeout("state")
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def send_text_message(self, room_id, message):
|
|
||||||
handler = self.hs.get_handlers().message_handler
|
|
||||||
event = yield handler.create_and_send_nonmember_event(
|
|
||||||
synapse.types.create_requester(self.user),
|
|
||||||
{
|
|
||||||
"type": "m.room.message",
|
|
||||||
"content": {"body": "message", "msgtype": "m.text"},
|
|
||||||
"room_id": room_id,
|
|
||||||
"sender": self.user.to_string(),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
defer.returnValue(event.event_id)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def create_room(self):
|
|
||||||
result = yield self.hs.get_handlers().room_creation_handler.create_room(
|
|
||||||
synapse.types.create_requester(self.user), {}
|
|
||||||
)
|
|
||||||
defer.returnValue(result["room_id"])
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get(self, **params):
|
|
||||||
request = NonCallableMock(spec_set=[
|
|
||||||
"write", "finish", "setResponseCode", "setHeader", "args",
|
|
||||||
"method", "processing"
|
|
||||||
])
|
|
||||||
|
|
||||||
request.method = "GET"
|
|
||||||
request.args = {k: [v] for k, v in params.items()}
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def processing():
|
|
||||||
yield
|
|
||||||
request.processing = processing
|
|
||||||
|
|
||||||
yield self.resource._async_render_GET(request)
|
|
||||||
self.assertTrue(request.finish.called)
|
|
||||||
|
|
||||||
if request.setResponseCode.called:
|
|
||||||
response_code = request.setResponseCode.call_args[0][0]
|
|
||||||
else:
|
|
||||||
response_code = 200
|
|
||||||
|
|
||||||
response_json = "".join(
|
|
||||||
call[0][0] for call in request.write.call_args_list
|
|
||||||
)
|
|
||||||
response_body = json.loads(response_json)
|
|
||||||
|
|
||||||
if response_code == 200:
|
|
||||||
self.check_response(response_body)
|
|
||||||
|
|
||||||
defer.returnValue((response_code, response_body))
|
|
||||||
|
|
||||||
def check_response(self, response_body):
|
|
||||||
for name, stream in response_body.items():
|
|
||||||
self.assertIn("field_names", stream)
|
|
||||||
field_names = stream["field_names"]
|
|
||||||
self.assertIn("rows", stream)
|
|
||||||
for row in stream["rows"]:
|
|
||||||
self.assertEquals(
|
|
||||||
len(row), len(field_names),
|
|
||||||
"%s: len(row = %r) == len(field_names = %r)" % (
|
|
||||||
name, row, field_names
|
|
||||||
)
|
|
||||||
)
|
|
Loading…
Reference in New Issue
Block a user