Merge remote-tracking branch 'origin/develop' into markjh/worker_config

This commit is contained in:
Mark Haines 2016-06-16 11:20:17 +01:00
commit f1f70bf4b5
12 changed files with 134 additions and 56 deletions

View File

@ -1,3 +1,29 @@
Changes in synapse v0.16.1-rc1 (2016-06-15)
===========================================
Features: None
Changes:
* Log requester for ``/publicRoom`` endpoints when possible (PR #856)
* 502 on ``/thumbnail`` when can't connect to remote server (PR #862)
* Linearize fetching of gaps on incoming events (PR #871)
Bugs fixes:
* Fix bug where rooms where marked as published by default (PR #857)
* Fix bug where joining room with an event with invalid sender (PR #868)
* Fix bug where backfilled events were sent down sync streams (PR #869)
* Fix bug where outgoing connections could wedge indefinitely, causing push
notifications to be unreliable (PR #870)
Performance improvements:
* Improve ``/publicRooms`` performance(PR #859)
Changes in synapse v0.16.0 (2016-06-09) Changes in synapse v0.16.0 (2016-06-09)
======================================= =======================================
@ -28,7 +54,7 @@ Bug fixes:
* Fix bug where synapse sent malformed transactions to AS's when retrying * Fix bug where synapse sent malformed transactions to AS's when retrying
transactions (Commits 310197b, 8437906) transactions (Commits 310197b, 8437906)
Performance Improvements: Performance improvements:
* Remove event fetching from DB threads (PR #835) * Remove event fetching from DB threads (PR #835)
* Change the way we cache events (PR #836) * Change the way we cache events (PR #836)

View File

@ -80,6 +80,7 @@ echo >&2 "Running sytest with PostgreSQL";
--synapse-directory $WORKSPACE \ --synapse-directory $WORKSPACE \
--dendron $WORKSPACE/dendron/bin/dendron \ --dendron $WORKSPACE/dendron/bin/dendron \
--pusher \ --pusher \
--synchrotron \
--port-base $PORT_BASE --port-base $PORT_BASE
cd .. cd ..

View File

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.16.0" __version__ = "0.16.1-rc1"

View File

@ -31,6 +31,9 @@ logger = logging.getLogger(__name__)
class FederationBase(object): class FederationBase(object):
def __init__(self, hs):
pass
@defer.inlineCallbacks @defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
include_none=False): include_none=False):

View File

@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
class FederationClient(FederationBase): class FederationClient(FederationBase):
def __init__(self, hs):
super(FederationClient, self).__init__(hs)
def start_get_pdu_cache(self): def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache( self._get_pdu_cache = ExpiringCache(

View File

@ -19,6 +19,7 @@ from twisted.internet import defer
from .federation_base import FederationBase from .federation_base import FederationBase
from .units import Transaction, Edu from .units import Transaction, Edu
from synapse.util.async import Linearizer
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
import synapse.metrics import synapse.metrics
@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[
class FederationServer(FederationBase): class FederationServer(FederationBase):
def __init__(self, hs):
super(FederationServer, self).__init__(hs)
self._room_pdu_linearizer = Linearizer()
def set_handler(self, handler): def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate """Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are receipt of new PDUs from other home servers. The required methods are
@ -491,43 +497,51 @@ class FederationServer(FederationBase):
pdu.internal_metadata.outlier = True pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth: elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen: if get_missing and prevs - seen:
latest = yield self.store.get_latest_event_ids_in_room( # If we're missing stuff, ensure we only fetch stuff one
pdu.room_id # at a time.
) with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
# We add the prev events that we have seen to the latest if prevs - seen:
# list to ensure the remote server doesn't give them to us latest = yield self.store.get_latest_event_ids_in_room(
latest = set(latest) pdu.room_id
latest |= seen )
logger.info( # We add the prev events that we have seen to the latest
"Missing %d events for room %r: %r...", # list to ensure the remote server doesn't give them to us
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] latest = set(latest)
) latest |= seen
missing_events = yield self.get_missing_events( logger.info(
origin, "Missing %d events for room %r: %r...",
pdu.room_id, len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
earliest_events_ids=list(latest), )
latest_events=[pdu],
limit=10,
min_depth=min_depth,
)
# We want to sort these by depth so we process them and missing_events = yield self.get_missing_events(
# tell clients about them in order. origin,
missing_events.sort(key=lambda x: x.depth) pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
)
for e in missing_events: # We want to sort these by depth so we process them and
yield self._handle_new_pdu( # tell clients about them in order.
origin, missing_events.sort(key=lambda x: x.depth)
e,
get_missing=False
)
have_seen = yield self.store.have_events( for e in missing_events:
[ev for ev, _ in pdu.prev_events] yield self._handle_new_pdu(
) origin,
e,
get_missing=False
)
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
prevs = {e_id for e_id, _ in pdu.prev_events} prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys()) seen = set(have_seen.keys())

View File

@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
self.hs = hs self.hs = hs
super(ReplicationLayer, self).__init__(hs)
def __str__(self): def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name return "<ReplicationLayer(%s)>" % self.server_name

View File

@ -626,6 +626,6 @@ class AuthHandler(BaseHandler):
Whether self.hash(password) == stored_hash (bool). Whether self.hash(password) == stored_hash (bool).
""" """
if stored_hash: if stored_hash:
return bcrypt.hashpw(password, stored_hash) == stored_hash return bcrypt.hashpw(password, stored_hash.encode('utf-8')) == stored_hash
else: else:
return False return False

View File

@ -345,19 +345,21 @@ class FederationHandler(BaseHandler):
) )
missing_auth = required_auth - set(auth_events) missing_auth = required_auth - set(auth_events)
results = yield defer.gatherResults( if missing_auth:
[ logger.info("Missing auth for backfill: %r", missing_auth)
self.replication_layer.get_pdu( results = yield defer.gatherResults(
[dest], [
event_id, self.replication_layer.get_pdu(
outlier=True, [dest],
timeout=10000, event_id,
) outlier=True,
for event_id in missing_auth timeout=10000,
], )
consumeErrors=True for event_id in missing_auth
).addErrback(unwrapFirstError) ],
auth_events.update({a.event_id: a for a in results}) consumeErrors=True
).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results})
ev_infos = [] ev_infos = []
for a in auth_events.values(): for a in auth_events.values():
@ -399,7 +401,7 @@ class FederationHandler(BaseHandler):
# previous to work out the state. # previous to work out the state.
# TODO: We can probably do something more clever here. # TODO: We can probably do something more clever here.
yield self._handle_new_event( yield self._handle_new_event(
dest, event dest, event, backfilled=True,
) )
defer.returnValue(events) defer.returnValue(events)

View File

@ -24,12 +24,13 @@ from synapse.http.endpoint import SpiderEndpoint
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from twisted.internet import defer, reactor, ssl, protocol from twisted.internet import defer, reactor, ssl, protocol, task
from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
from twisted.web.client import ( from twisted.web.client import (
BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent, BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent,
readBody, FileBodyProducer, PartialDownloadError, readBody, PartialDownloadError,
) )
from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.http import PotentialDataLoss from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone from twisted.web._newclient import ResponseDone
@ -468,3 +469,26 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
def creatorForNetloc(self, hostname, port): def creatorForNetloc(self, hostname, port):
return self return self
class FileBodyProducer(TwistedFileBodyProducer):
"""Workaround for https://twistedmatrix.com/trac/ticket/8473
We override the pauseProducing and resumeProducing methods in twisted's
FileBodyProducer so that they do not raise exceptions if the task has
already completed.
"""
def pauseProducing(self):
try:
super(FileBodyProducer, self).pauseProducing()
except task.TaskDone:
# task has already completed
pass
def resumeProducing(self):
try:
super(FileBodyProducer, self).resumeProducing()
except task.NotPaused:
# task was not paused (probably because it had already completed)
pass

View File

@ -252,7 +252,8 @@ class PreviewUrlResource(Resource):
og = {} og = {}
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"): for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
og[tag.attrib['property']] = tag.attrib['content'] if 'content' in tag.attrib:
og[tag.attrib['property']] = tag.attrib['content']
# TODO: grab article: meta tags too, e.g.: # TODO: grab article: meta tags too, e.g.:
@ -279,7 +280,7 @@ class PreviewUrlResource(Resource):
# TODO: consider inlined CSS styles as well as width & height attribs # TODO: consider inlined CSS styles as well as width & height attribs
images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]") images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
images = sorted(images, key=lambda i: ( images = sorted(images, key=lambda i: (
-1 * int(i.attrib['width']) * int(i.attrib['height']) -1 * float(i.attrib['width']) * float(i.attrib['height'])
)) ))
if not images: if not images:
images = tree.xpath("//img[@src]") images = tree.xpath("//img[@src]")
@ -287,9 +288,9 @@ class PreviewUrlResource(Resource):
og['og:image'] = images[0].attrib['src'] og['og:image'] = images[0].attrib['src']
# pre-cache the image for posterity # pre-cache the image for posterity
# FIXME: it might be cleaner to use the same flow as the main /preview_url request # FIXME: it might be cleaner to use the same flow as the main /preview_url
# itself and benefit from the same caching etc. But for now we just rely on the # request itself and benefit from the same caching etc. But for now we
# caching on the master request to speed things up. # just rely on the caching on the master request to speed things up.
if 'og:image' in og and og['og:image']: if 'og:image' in og and og['og:image']:
image_info = yield self._download_url( image_info = yield self._download_url(
self._rebase_url(og['og:image'], media_info['uri']), requester.user self._rebase_url(og['og:image'], media_info['uri']), requester.user

View File

@ -22,7 +22,10 @@ Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
def get_domain_from_id(string): def get_domain_from_id(string):
return string.split(":", 1)[1] try:
return string.split(":", 1)[1]
except IndexError:
raise SynapseError(400, "Invalid ID: %r", string)
class DomainSpecificString( class DomainSpecificString(