Store federation stream positions in the database
This commit is contained in:
parent
f8ee66250a
commit
7c9cdb2245
|
@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer):
|
||||||
http_client = self.get_simple_http_client()
|
http_client = self.get_simple_http_client()
|
||||||
store = self.get_datastore()
|
store = self.get_datastore()
|
||||||
replication_url = self.config.worker_replication_url
|
replication_url = self.config.worker_replication_url
|
||||||
send_handler = self._get_send_handler()
|
send_handler = FederationSenderHandler(self)
|
||||||
|
|
||||||
|
send_handler.on_start()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
args = store.stream_positions()
|
args = store.stream_positions()
|
||||||
args.update(send_handler.stream_positions())
|
args.update((yield send_handler.stream_positions()))
|
||||||
args["timeout"] = 30000
|
args["timeout"] = 30000
|
||||||
result = yield http_client.get_json(replication_url, args=args)
|
result = yield http_client.get_json(replication_url, args=args)
|
||||||
yield store.process_replication(result)
|
yield store.process_replication(result)
|
||||||
send_handler.process_replication(result)
|
yield send_handler.process_replication(result)
|
||||||
except:
|
except:
|
||||||
logger.exception("Error replicating from %r", replication_url)
|
logger.exception("Error replicating from %r", replication_url)
|
||||||
yield sleep(30)
|
yield sleep(30)
|
||||||
|
|
||||||
def _get_send_handler(self):
|
|
||||||
try:
|
|
||||||
return self._send_handler
|
|
||||||
except AttributeError:
|
|
||||||
self._send_handler = FederationSenderHandler(self)
|
|
||||||
return self._send_handler
|
|
||||||
|
|
||||||
|
|
||||||
def start(config_options):
|
def start(config_options):
|
||||||
try:
|
try:
|
||||||
|
@ -221,22 +216,29 @@ def start(config_options):
|
||||||
|
|
||||||
class FederationSenderHandler(object):
|
class FederationSenderHandler(object):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
self.store = hs.get_datastore()
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
|
|
||||||
self._latest_room_serial = -1
|
|
||||||
self._room_serials = {}
|
self._room_serials = {}
|
||||||
self._room_typing = {}
|
self._room_typing = {}
|
||||||
|
|
||||||
def stream_positions(self):
|
def on_start(self):
|
||||||
# We must update this token from the response of the previous
|
# There may be some events that are persisted but haven't been sent,
|
||||||
# sync. In particular, the stream id may "reset" back to zero/a low
|
# so send them now.
|
||||||
# value which we *must* use for the next replication request.
|
self.federation_sender.notify_new_events(
|
||||||
return {"federation": self._latest_room_serial}
|
self.store.get_room_max_stream_ordering()
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def stream_positions(self):
|
||||||
|
stream_id = yield self.store.get_federation_out_pos("federation")
|
||||||
|
defer.returnValue({"federation": stream_id})
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def process_replication(self, result):
|
def process_replication(self, result):
|
||||||
fed_stream = result.get("federation")
|
fed_stream = result.get("federation")
|
||||||
if fed_stream:
|
if fed_stream:
|
||||||
self._latest_room_serial = int(fed_stream["position"])
|
latest_id = int(fed_stream["position"])
|
||||||
|
|
||||||
presence_to_send = {}
|
presence_to_send = {}
|
||||||
keyed_edus = {}
|
keyed_edus = {}
|
||||||
|
@ -296,6 +298,10 @@ class FederationSenderHandler(object):
|
||||||
for destination in device_destinations:
|
for destination in device_destinations:
|
||||||
self.federation_sender.send_device_messages(destination)
|
self.federation_sender.send_device_messages(destination)
|
||||||
|
|
||||||
|
yield self.store.update_federation_out_pos(
|
||||||
|
"federation", latest_id
|
||||||
|
)
|
||||||
|
|
||||||
event_stream = result.get("events")
|
event_stream = result.get("events")
|
||||||
if event_stream:
|
if event_stream:
|
||||||
latest_pos = event_stream["position"]
|
latest_pos = event_stream["position"]
|
||||||
|
|
|
@ -106,7 +106,7 @@ class TransactionQueue(object):
|
||||||
self._order = 1
|
self._order = 1
|
||||||
|
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
self._last_token = 0
|
self._last_poked_id = -1
|
||||||
|
|
||||||
def can_send_to(self, destination):
|
def can_send_to(self, destination):
|
||||||
"""Can we send messages to the given server?
|
"""Can we send messages to the given server?
|
||||||
|
@ -130,17 +130,22 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def notify_new_events(self, current_id):
|
def notify_new_events(self, current_id):
|
||||||
|
self._last_poked_id = max(current_id, self._last_poked_id)
|
||||||
|
|
||||||
if self._is_processing:
|
if self._is_processing:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._is_processing = True
|
self._is_processing = True
|
||||||
while True:
|
while True:
|
||||||
self._last_token, events = yield self.store.get_all_new_events_stream(
|
last_token = yield self.store.get_federation_out_pos("events")
|
||||||
self._last_token, current_id, limit=20,
|
next_token, events = yield self.store.get_all_new_events_stream(
|
||||||
|
last_token, self._last_poked_id, limit=20,
|
||||||
)
|
)
|
||||||
|
|
||||||
if not events:
|
logger.debug("Handling %s -> %s", last_token, next_token)
|
||||||
|
|
||||||
|
if not events and next_token >= self._last_poked_id:
|
||||||
break
|
break
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
|
@ -151,7 +156,15 @@ class TransactionQueue(object):
|
||||||
destinations = [
|
destinations = [
|
||||||
get_domain_from_id(user_id) for user_id in users_in_room
|
get_domain_from_id(user_id) for user_id in users_in_room
|
||||||
]
|
]
|
||||||
|
|
||||||
|
logger.debug("Sending %s to %r", event, destinations)
|
||||||
|
|
||||||
self.send_pdu(event, destinations)
|
self.send_pdu(event, destinations)
|
||||||
|
|
||||||
|
yield self.store.update_federation_out_pos(
|
||||||
|
"events", next_token
|
||||||
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,9 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
|
|
||||||
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
|
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
|
||||||
|
|
||||||
|
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
|
||||||
|
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedEventStore, self).stream_positions()
|
result = super(SlavedEventStore, self).stream_positions()
|
||||||
result["events"] = self._stream_id_gen.get_current_token()
|
result["events"] = self._stream_id_gen.get_current_token()
|
||||||
|
|
|
@ -561,12 +561,17 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
|
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
|
||||||
|
if keyvalues:
|
||||||
|
where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
|
||||||
|
else:
|
||||||
|
where = ""
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
|
"SELECT %(retcol)s FROM %(table)s %(where)s"
|
||||||
) % {
|
) % {
|
||||||
"retcol": retcol,
|
"retcol": retcol,
|
||||||
"table": table,
|
"table": table,
|
||||||
"where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
|
"where": where,
|
||||||
}
|
}
|
||||||
|
|
||||||
txn.execute(sql, keyvalues.values())
|
txn.execute(sql, keyvalues.values())
|
||||||
|
@ -744,10 +749,15 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
|
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
|
||||||
update_sql = "UPDATE %s SET %s WHERE %s" % (
|
if keyvalues:
|
||||||
|
where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
|
||||||
|
else:
|
||||||
|
where = ""
|
||||||
|
|
||||||
|
update_sql = "UPDATE %s SET %s %s" % (
|
||||||
table,
|
table,
|
||||||
", ".join("%s = ?" % (k,) for k in updatevalues),
|
", ".join("%s = ?" % (k,) for k in updatevalues),
|
||||||
" AND ".join("%s = ?" % (k,) for k in keyvalues)
|
where,
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
/* Copyright 2016 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE TABLE federation_stream_position(
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
stream_id INTEGER NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
|
||||||
|
INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
|
|
@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore):
|
||||||
events = yield self._get_events(event_ids)
|
events = yield self._get_events(event_ids)
|
||||||
|
|
||||||
defer.returnValue((upper_bound, events))
|
defer.returnValue((upper_bound, events))
|
||||||
|
|
||||||
|
def get_federation_out_pos(self, typ):
|
||||||
|
return self._simple_select_one_onecol(
|
||||||
|
table="federation_stream_position",
|
||||||
|
retcol="stream_id",
|
||||||
|
keyvalues={"type": typ},
|
||||||
|
desc="get_federation_out_pos"
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_federation_out_pos(self, typ, stream_id):
|
||||||
|
return self._simple_update_one(
|
||||||
|
table="federation_stream_position",
|
||||||
|
keyvalues={"type": typ},
|
||||||
|
updatevalues={"stream_id": stream_id},
|
||||||
|
desc="update_federation_out_pos",
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue