commit
e731d30d90
|
@ -47,6 +47,7 @@ from synapse.crypto import context_factory
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
from synapse.rest.client.v1 import ClientV1RestResource
|
from synapse.rest.client.v1 import ClientV1RestResource
|
||||||
from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
|
from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
|
||||||
|
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||||
|
|
||||||
from daemonize import Daemonize
|
from daemonize import Daemonize
|
||||||
import twisted.manhole.telnet
|
import twisted.manhole.telnet
|
||||||
|
@ -100,6 +101,12 @@ class SynapseHomeServer(HomeServer):
|
||||||
def build_resource_for_server_key(self):
|
def build_resource_for_server_key(self):
|
||||||
return LocalKey(self)
|
return LocalKey(self)
|
||||||
|
|
||||||
|
def build_resource_for_metrics(self):
|
||||||
|
if self.get_config().enable_metrics:
|
||||||
|
return MetricsResource(self)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def build_db_pool(self):
|
def build_db_pool(self):
|
||||||
return adbapi.ConnectionPool(
|
return adbapi.ConnectionPool(
|
||||||
"sqlite3", self.get_db_name(),
|
"sqlite3", self.get_db_name(),
|
||||||
|
@ -110,7 +117,7 @@ class SynapseHomeServer(HomeServer):
|
||||||
# so that :memory: sqlite works
|
# so that :memory: sqlite works
|
||||||
)
|
)
|
||||||
|
|
||||||
def create_resource_tree(self, web_client, redirect_root_to_web_client):
|
def create_resource_tree(self, redirect_root_to_web_client):
|
||||||
"""Create the resource tree for this Home Server.
|
"""Create the resource tree for this Home Server.
|
||||||
|
|
||||||
This in unduly complicated because Twisted does not support putting
|
This in unduly complicated because Twisted does not support putting
|
||||||
|
@ -122,6 +129,9 @@ class SynapseHomeServer(HomeServer):
|
||||||
location of the web client. This does nothing if web_client is not
|
location of the web client. This does nothing if web_client is not
|
||||||
True.
|
True.
|
||||||
"""
|
"""
|
||||||
|
config = self.get_config()
|
||||||
|
web_client = config.webclient
|
||||||
|
|
||||||
# list containing (path_str, Resource) e.g:
|
# list containing (path_str, Resource) e.g:
|
||||||
# [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
|
# [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
|
||||||
desired_tree = [
|
desired_tree = [
|
||||||
|
@ -145,6 +155,10 @@ class SynapseHomeServer(HomeServer):
|
||||||
else:
|
else:
|
||||||
self.root_resource = Resource()
|
self.root_resource = Resource()
|
||||||
|
|
||||||
|
metrics_resource = self.get_resource_for_metrics()
|
||||||
|
if config.metrics_port is None and metrics_resource is not None:
|
||||||
|
desired_tree.append((METRICS_PREFIX, metrics_resource))
|
||||||
|
|
||||||
# ideally we'd just use getChild and putChild but getChild doesn't work
|
# ideally we'd just use getChild and putChild but getChild doesn't work
|
||||||
# unless you give it a Request object IN ADDITION to the name :/ So
|
# unless you give it a Request object IN ADDITION to the name :/ So
|
||||||
# instead, we'll store a copy of this mapping so we can actually add
|
# instead, we'll store a copy of this mapping so we can actually add
|
||||||
|
@ -206,17 +220,27 @@ class SynapseHomeServer(HomeServer):
|
||||||
"""
|
"""
|
||||||
return "%s-%s" % (resource, path_seg)
|
return "%s-%s" % (resource, path_seg)
|
||||||
|
|
||||||
def start_listening(self, secure_port, unsecure_port):
|
def start_listening(self):
|
||||||
if secure_port is not None:
|
config = self.get_config()
|
||||||
|
|
||||||
|
if not config.no_tls and config.bind_port is not None:
|
||||||
reactor.listenSSL(
|
reactor.listenSSL(
|
||||||
secure_port, Site(self.root_resource), self.tls_context_factory
|
config.bind_port, Site(self.root_resource), self.tls_context_factory
|
||||||
)
|
)
|
||||||
logger.info("Synapse now listening on port %d", secure_port)
|
logger.info("Synapse now listening on port %d", config.bind_port)
|
||||||
if unsecure_port is not None:
|
|
||||||
|
if config.unsecure_port is not None:
|
||||||
reactor.listenTCP(
|
reactor.listenTCP(
|
||||||
unsecure_port, Site(self.root_resource)
|
config.unsecure_port, Site(self.root_resource)
|
||||||
)
|
)
|
||||||
logger.info("Synapse now listening on port %d", unsecure_port)
|
logger.info("Synapse now listening on port %d", config.unsecure_port)
|
||||||
|
|
||||||
|
metrics_resource = self.get_resource_for_metrics()
|
||||||
|
if metrics_resource and config.metrics_port is not None:
|
||||||
|
reactor.listenTCP(
|
||||||
|
config.metrics_port, Site(metrics_resource), interface="127.0.0.1",
|
||||||
|
)
|
||||||
|
logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
|
||||||
|
|
||||||
|
|
||||||
def get_version_string():
|
def get_version_string():
|
||||||
|
@ -340,7 +364,6 @@ def setup(config_options):
|
||||||
)
|
)
|
||||||
|
|
||||||
hs.create_resource_tree(
|
hs.create_resource_tree(
|
||||||
web_client=config.webclient,
|
|
||||||
redirect_root_to_web_client=True,
|
redirect_root_to_web_client=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -369,11 +392,7 @@ def setup(config_options):
|
||||||
f.namespace['hs'] = hs
|
f.namespace['hs'] = hs
|
||||||
reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
|
reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
|
||||||
|
|
||||||
bind_port = config.bind_port
|
hs.start_listening()
|
||||||
if config.no_tls:
|
|
||||||
bind_port = None
|
|
||||||
|
|
||||||
hs.start_listening(bind_port, config.unsecure_port)
|
|
||||||
|
|
||||||
hs.get_pusherpool().start()
|
hs.get_pusherpool().start()
|
||||||
hs.get_state_handler().start_caching()
|
hs.get_state_handler().start_caching()
|
||||||
|
|
|
@ -23,11 +23,13 @@ from .captcha import CaptchaConfig
|
||||||
from .email import EmailConfig
|
from .email import EmailConfig
|
||||||
from .voip import VoipConfig
|
from .voip import VoipConfig
|
||||||
from .registration import RegistrationConfig
|
from .registration import RegistrationConfig
|
||||||
|
from .metrics import MetricsConfig
|
||||||
|
|
||||||
|
|
||||||
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
|
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
|
||||||
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
|
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
|
||||||
EmailConfig, VoipConfig, RegistrationConfig,):
|
EmailConfig, VoipConfig, RegistrationConfig,
|
||||||
|
MetricsConfig,):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import Config
|
||||||
|
|
||||||
|
|
||||||
|
class MetricsConfig(Config):
|
||||||
|
def __init__(self, args):
|
||||||
|
super(MetricsConfig, self).__init__(args)
|
||||||
|
self.enable_metrics = args.enable_metrics
|
||||||
|
self.metrics_port = args.metrics_port
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def add_arguments(cls, parser):
|
||||||
|
super(MetricsConfig, cls).add_arguments(parser)
|
||||||
|
metrics_group = parser.add_argument_group("metrics")
|
||||||
|
metrics_group.add_argument(
|
||||||
|
'--enable-metrics', dest="enable_metrics", action="store_true",
|
||||||
|
help="Enable collection and rendering of performance metrics"
|
||||||
|
)
|
||||||
|
metrics_group.add_argument(
|
||||||
|
'--metrics-port', metavar="PORT", type=int,
|
||||||
|
help="Separate port to accept metrics requests on (on localhost)"
|
||||||
|
)
|
|
@ -25,6 +25,7 @@ from synapse.api.errors import (
|
||||||
from synapse.util.expiringcache import ExpiringCache
|
from synapse.util.expiringcache import ExpiringCache
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||||
|
|
||||||
|
@ -36,9 +37,17 @@ import random
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# synapse.federation.federation_client is a silly name
|
||||||
|
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
|
||||||
|
|
||||||
|
sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
|
||||||
|
|
||||||
|
sent_edus_counter = metrics.register_counter("sent_edus")
|
||||||
|
|
||||||
|
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
|
||||||
|
|
||||||
|
|
||||||
class FederationClient(FederationBase):
|
class FederationClient(FederationBase):
|
||||||
def __init__(self):
|
|
||||||
self._get_pdu_cache = None
|
|
||||||
|
|
||||||
def start_get_pdu_cache(self):
|
def start_get_pdu_cache(self):
|
||||||
self._get_pdu_cache = ExpiringCache(
|
self._get_pdu_cache = ExpiringCache(
|
||||||
|
@ -68,6 +77,8 @@ class FederationClient(FederationBase):
|
||||||
order = self._order
|
order = self._order
|
||||||
self._order += 1
|
self._order += 1
|
||||||
|
|
||||||
|
sent_pdus_destination_dist.inc_by(len(destinations))
|
||||||
|
|
||||||
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
|
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
|
||||||
|
|
||||||
# TODO, add errback, etc.
|
# TODO, add errback, etc.
|
||||||
|
@ -87,6 +98,8 @@ class FederationClient(FederationBase):
|
||||||
content=content,
|
content=content,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
sent_edus_counter.inc()
|
||||||
|
|
||||||
# TODO, add errback, etc.
|
# TODO, add errback, etc.
|
||||||
self._transaction_queue.enqueue_edu(edu)
|
self._transaction_queue.enqueue_edu(edu)
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
@ -113,6 +126,8 @@ class FederationClient(FederationBase):
|
||||||
a Deferred which will eventually yield a JSON object from the
|
a Deferred which will eventually yield a JSON object from the
|
||||||
response
|
response
|
||||||
"""
|
"""
|
||||||
|
sent_queries_counter.inc(query_type)
|
||||||
|
|
||||||
return self.transport_layer.make_query(
|
return self.transport_layer.make_query(
|
||||||
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
|
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
|
||||||
)
|
)
|
||||||
|
|
|
@ -22,6 +22,7 @@ from .units import Transaction, Edu
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.logcontext import PreserveLoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from synapse.api.errors import FederationError, SynapseError
|
from synapse.api.errors import FederationError, SynapseError
|
||||||
|
|
||||||
|
@ -32,6 +33,15 @@ import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# synapse.federation.federation_server is a silly name
|
||||||
|
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
|
||||||
|
|
||||||
|
received_pdus_counter = metrics.register_counter("received_pdus")
|
||||||
|
|
||||||
|
received_edus_counter = metrics.register_counter("received_edus")
|
||||||
|
|
||||||
|
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
|
||||||
|
|
||||||
|
|
||||||
class FederationServer(FederationBase):
|
class FederationServer(FederationBase):
|
||||||
def set_handler(self, handler):
|
def set_handler(self, handler):
|
||||||
|
@ -84,6 +94,8 @@ class FederationServer(FederationBase):
|
||||||
def on_incoming_transaction(self, transaction_data):
|
def on_incoming_transaction(self, transaction_data):
|
||||||
transaction = Transaction(**transaction_data)
|
transaction = Transaction(**transaction_data)
|
||||||
|
|
||||||
|
received_pdus_counter.inc_by(len(transaction.pdus))
|
||||||
|
|
||||||
for p in transaction.pdus:
|
for p in transaction.pdus:
|
||||||
if "unsigned" in p:
|
if "unsigned" in p:
|
||||||
unsigned = p["unsigned"]
|
unsigned = p["unsigned"]
|
||||||
|
@ -153,6 +165,8 @@ class FederationServer(FederationBase):
|
||||||
defer.returnValue((200, response))
|
defer.returnValue((200, response))
|
||||||
|
|
||||||
def received_edu(self, origin, edu_type, content):
|
def received_edu(self, origin, edu_type, content):
|
||||||
|
received_edus_counter.inc()
|
||||||
|
|
||||||
if edu_type in self.edu_handlers:
|
if edu_type in self.edu_handlers:
|
||||||
self.edu_handlers[edu_type](origin, content)
|
self.edu_handlers[edu_type](origin, content)
|
||||||
else:
|
else:
|
||||||
|
@ -204,6 +218,8 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_query_request(self, query_type, args):
|
def on_query_request(self, query_type, args):
|
||||||
|
received_queries_counter.inc(query_type)
|
||||||
|
|
||||||
if query_type in self.query_handlers:
|
if query_type in self.query_handlers:
|
||||||
response = yield self.query_handlers[query_type](args)
|
response = yield self.query_handlers[query_type](args)
|
||||||
defer.returnValue((200, response))
|
defer.returnValue((200, response))
|
||||||
|
|
|
@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext
|
||||||
from synapse.util.retryutils import (
|
from synapse.util.retryutils import (
|
||||||
get_retry_limiter, NotRetryingDestination,
|
get_retry_limiter, NotRetryingDestination,
|
||||||
)
|
)
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TransactionQueue(object):
|
class TransactionQueue(object):
|
||||||
"""This class makes sure we only have one transaction in flight at
|
"""This class makes sure we only have one transaction in flight at
|
||||||
|
@ -54,11 +57,25 @@ class TransactionQueue(object):
|
||||||
# done
|
# done
|
||||||
self.pending_transactions = {}
|
self.pending_transactions = {}
|
||||||
|
|
||||||
|
metrics.register_callback(
|
||||||
|
"pending_destinations",
|
||||||
|
lambda: len(self.pending_transactions),
|
||||||
|
)
|
||||||
|
|
||||||
# Is a mapping from destination -> list of
|
# Is a mapping from destination -> list of
|
||||||
# tuple(pending pdus, deferred, order)
|
# tuple(pending pdus, deferred, order)
|
||||||
self.pending_pdus_by_dest = {}
|
self.pending_pdus_by_dest = pdus = {}
|
||||||
# destination -> list of tuple(edu, deferred)
|
# destination -> list of tuple(edu, deferred)
|
||||||
self.pending_edus_by_dest = {}
|
self.pending_edus_by_dest = edus = {}
|
||||||
|
|
||||||
|
metrics.register_callback(
|
||||||
|
"pending_pdus",
|
||||||
|
lambda: sum(map(len, pdus.values())),
|
||||||
|
)
|
||||||
|
metrics.register_callback(
|
||||||
|
"pending_edus",
|
||||||
|
lambda: sum(map(len, edus.values())),
|
||||||
|
)
|
||||||
|
|
||||||
# destination -> list of tuple(failure, deferred)
|
# destination -> list of tuple(failure, deferred)
|
||||||
self.pending_failures_by_dest = {}
|
self.pending_failures_by_dest = {}
|
||||||
|
|
|
@ -148,6 +148,10 @@ class BaseFederationServlet(object):
|
||||||
logger.exception("authenticate_request failed")
|
logger.exception("authenticate_request failed")
|
||||||
raise
|
raise
|
||||||
defer.returnValue(response)
|
defer.returnValue(response)
|
||||||
|
|
||||||
|
# Extra logic that functools.wraps() doesn't finish
|
||||||
|
new_code.__self__ = code.__self__
|
||||||
|
|
||||||
return new_code
|
return new_code
|
||||||
|
|
||||||
def register(self, server):
|
def register(self, server):
|
||||||
|
|
|
@ -21,6 +21,7 @@ from synapse.api.constants import PresenceState
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.logcontext import PreserveLoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext
|
||||||
from synapse.types import UserID
|
from synapse.types import UserID
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
|
@ -29,6 +30,8 @@ import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
|
||||||
# TODO(paul): Maybe there's one of these I can steal from somewhere
|
# TODO(paul): Maybe there's one of these I can steal from somewhere
|
||||||
def partition(l, func):
|
def partition(l, func):
|
||||||
|
@ -133,6 +136,11 @@ class PresenceHandler(BaseHandler):
|
||||||
self._user_cachemap = {}
|
self._user_cachemap = {}
|
||||||
self._user_cachemap_latest_serial = 0
|
self._user_cachemap_latest_serial = 0
|
||||||
|
|
||||||
|
metrics.register_callback(
|
||||||
|
"userCachemap:size",
|
||||||
|
lambda: len(self._user_cachemap),
|
||||||
|
)
|
||||||
|
|
||||||
def _get_or_make_usercache(self, user):
|
def _get_or_make_usercache(self, user):
|
||||||
"""If the cache entry doesn't exist, initialise a new one."""
|
"""If the cache entry doesn't exist, initialise a new one."""
|
||||||
if user not in self._user_cachemap:
|
if user not in self._user_cachemap:
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
from synapse.api.errors import CodeMessageException
|
from synapse.api.errors import CodeMessageException
|
||||||
from syutil.jsonutil import encode_canonical_json
|
from syutil.jsonutil import encode_canonical_json
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.web.client import (
|
from twisted.web.client import (
|
||||||
|
@ -31,6 +32,17 @@ import urllib
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
outgoing_requests_counter = metrics.register_counter(
|
||||||
|
"requests",
|
||||||
|
labels=["method"],
|
||||||
|
)
|
||||||
|
incoming_responses_counter = metrics.register_counter(
|
||||||
|
"responses",
|
||||||
|
labels=["method", "code"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SimpleHttpClient(object):
|
class SimpleHttpClient(object):
|
||||||
"""
|
"""
|
||||||
|
@ -45,12 +57,30 @@ class SimpleHttpClient(object):
|
||||||
self.agent = Agent(reactor)
|
self.agent = Agent(reactor)
|
||||||
self.version_string = hs.version_string
|
self.version_string = hs.version_string
|
||||||
|
|
||||||
|
def request(self, method, *args, **kwargs):
|
||||||
|
# A small wrapper around self.agent.request() so we can easily attach
|
||||||
|
# counters to it
|
||||||
|
outgoing_requests_counter.inc(method)
|
||||||
|
d = self.agent.request(method, *args, **kwargs)
|
||||||
|
|
||||||
|
def _cb(response):
|
||||||
|
incoming_responses_counter.inc(method, response.code)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _eb(failure):
|
||||||
|
incoming_responses_counter.inc(method, "ERR")
|
||||||
|
return failure
|
||||||
|
|
||||||
|
d.addCallbacks(_cb, _eb)
|
||||||
|
|
||||||
|
return d
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def post_urlencoded_get_json(self, uri, args={}):
|
def post_urlencoded_get_json(self, uri, args={}):
|
||||||
logger.debug("post_urlencoded_get_json args: %s", args)
|
logger.debug("post_urlencoded_get_json args: %s", args)
|
||||||
query_bytes = urllib.urlencode(args, True)
|
query_bytes = urllib.urlencode(args, True)
|
||||||
|
|
||||||
response = yield self.agent.request(
|
response = yield self.request(
|
||||||
"POST",
|
"POST",
|
||||||
uri.encode("ascii"),
|
uri.encode("ascii"),
|
||||||
headers=Headers({
|
headers=Headers({
|
||||||
|
@ -70,7 +100,7 @@ class SimpleHttpClient(object):
|
||||||
|
|
||||||
logger.info("HTTP POST %s -> %s", json_str, uri)
|
logger.info("HTTP POST %s -> %s", json_str, uri)
|
||||||
|
|
||||||
response = yield self.agent.request(
|
response = yield self.request(
|
||||||
"POST",
|
"POST",
|
||||||
uri.encode("ascii"),
|
uri.encode("ascii"),
|
||||||
headers=Headers({
|
headers=Headers({
|
||||||
|
@ -104,7 +134,7 @@ class SimpleHttpClient(object):
|
||||||
query_bytes = urllib.urlencode(args, True)
|
query_bytes = urllib.urlencode(args, True)
|
||||||
uri = "%s?%s" % (uri, query_bytes)
|
uri = "%s?%s" % (uri, query_bytes)
|
||||||
|
|
||||||
response = yield self.agent.request(
|
response = yield self.request(
|
||||||
"GET",
|
"GET",
|
||||||
uri.encode("ascii"),
|
uri.encode("ascii"),
|
||||||
headers=Headers({
|
headers=Headers({
|
||||||
|
@ -145,7 +175,7 @@ class SimpleHttpClient(object):
|
||||||
|
|
||||||
json_str = encode_canonical_json(json_body)
|
json_str = encode_canonical_json(json_body)
|
||||||
|
|
||||||
response = yield self.agent.request(
|
response = yield self.request(
|
||||||
"PUT",
|
"PUT",
|
||||||
uri.encode("ascii"),
|
uri.encode("ascii"),
|
||||||
headers=Headers({
|
headers=Headers({
|
||||||
|
@ -176,7 +206,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
|
||||||
def post_urlencoded_get_raw(self, url, args={}):
|
def post_urlencoded_get_raw(self, url, args={}):
|
||||||
query_bytes = urllib.urlencode(args, True)
|
query_bytes = urllib.urlencode(args, True)
|
||||||
|
|
||||||
response = yield self.agent.request(
|
response = yield self.request(
|
||||||
"POST",
|
"POST",
|
||||||
url.encode("ascii"),
|
url.encode("ascii"),
|
||||||
bodyProducer=FileBodyProducer(StringIO(query_bytes)),
|
bodyProducer=FileBodyProducer(StringIO(query_bytes)),
|
||||||
|
|
|
@ -23,6 +23,7 @@ from twisted.web._newclient import ResponseDone
|
||||||
from synapse.http.endpoint import matrix_federation_endpoint
|
from synapse.http.endpoint import matrix_federation_endpoint
|
||||||
from synapse.util.async import sleep
|
from synapse.util.async import sleep
|
||||||
from synapse.util.logcontext import PreserveLoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from syutil.jsonutil import encode_canonical_json
|
from syutil.jsonutil import encode_canonical_json
|
||||||
|
|
||||||
|
@ -40,6 +41,17 @@ import urlparse
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
outgoing_requests_counter = metrics.register_counter(
|
||||||
|
"requests",
|
||||||
|
labels=["method"],
|
||||||
|
)
|
||||||
|
incoming_responses_counter = metrics.register_counter(
|
||||||
|
"responses",
|
||||||
|
labels=["method", "code"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MatrixFederationHttpAgent(_AgentBase):
|
class MatrixFederationHttpAgent(_AgentBase):
|
||||||
|
|
||||||
|
@ -49,6 +61,8 @@ class MatrixFederationHttpAgent(_AgentBase):
|
||||||
def request(self, destination, endpoint, method, path, params, query,
|
def request(self, destination, endpoint, method, path, params, query,
|
||||||
headers, body_producer):
|
headers, body_producer):
|
||||||
|
|
||||||
|
outgoing_requests_counter.inc(method)
|
||||||
|
|
||||||
host = b""
|
host = b""
|
||||||
port = 0
|
port = 0
|
||||||
fragment = b""
|
fragment = b""
|
||||||
|
@ -59,10 +73,22 @@ class MatrixFederationHttpAgent(_AgentBase):
|
||||||
# Set the connection pool key to be the destination.
|
# Set the connection pool key to be the destination.
|
||||||
key = destination
|
key = destination
|
||||||
|
|
||||||
return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
|
d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
|
||||||
headers, body_producer,
|
headers, body_producer,
|
||||||
parsed_URI.originForm)
|
parsed_URI.originForm)
|
||||||
|
|
||||||
|
def _cb(response):
|
||||||
|
incoming_responses_counter.inc(method, response.code)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _eb(failure):
|
||||||
|
incoming_responses_counter.inc(method, "ERR")
|
||||||
|
return failure
|
||||||
|
|
||||||
|
d.addCallbacks(_cb, _eb)
|
||||||
|
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
class MatrixFederationHttpClient(object):
|
class MatrixFederationHttpClient(object):
|
||||||
"""HTTP client used to talk to other homeservers over the federation
|
"""HTTP client used to talk to other homeservers over the federation
|
||||||
|
|
|
@ -18,6 +18,7 @@ from synapse.api.errors import (
|
||||||
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
|
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
|
||||||
)
|
)
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from syutil.jsonutil import (
|
from syutil.jsonutil import (
|
||||||
encode_canonical_json, encode_pretty_printed_json
|
encode_canonical_json, encode_pretty_printed_json
|
||||||
|
@ -34,6 +35,17 @@ import urllib
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
incoming_requests_counter = metrics.register_counter(
|
||||||
|
"requests",
|
||||||
|
labels=["method", "servlet"],
|
||||||
|
)
|
||||||
|
outgoing_responses_counter = metrics.register_counter(
|
||||||
|
"responses",
|
||||||
|
labels=["method", "code"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class HttpServer(object):
|
class HttpServer(object):
|
||||||
""" Interface for registering callbacks on a HTTP server
|
""" Interface for registering callbacks on a HTTP server
|
||||||
|
@ -131,6 +143,15 @@ class JsonResource(HttpServer, resource.Resource):
|
||||||
# returned response. We pass both the request and any
|
# returned response. We pass both the request and any
|
||||||
# matched groups from the regex to the callback.
|
# matched groups from the regex to the callback.
|
||||||
|
|
||||||
|
callback = path_entry.callback
|
||||||
|
|
||||||
|
servlet_instance = getattr(callback, "__self__", None)
|
||||||
|
if servlet_instance is not None:
|
||||||
|
servlet_classname = servlet_instance.__class__.__name__
|
||||||
|
else:
|
||||||
|
servlet_classname = "%r" % callback
|
||||||
|
incoming_requests_counter.inc(request.method, servlet_classname)
|
||||||
|
|
||||||
args = [
|
args = [
|
||||||
urllib.unquote(u).decode("UTF-8") for u in m.groups()
|
urllib.unquote(u).decode("UTF-8") for u in m.groups()
|
||||||
]
|
]
|
||||||
|
@ -140,10 +161,7 @@ class JsonResource(HttpServer, resource.Resource):
|
||||||
request.method, request.path
|
request.method, request.path
|
||||||
)
|
)
|
||||||
|
|
||||||
code, response = yield path_entry.callback(
|
code, response = yield callback(request, *args)
|
||||||
request,
|
|
||||||
*args
|
|
||||||
)
|
|
||||||
|
|
||||||
self._send_response(request, code, response)
|
self._send_response(request, code, response)
|
||||||
return
|
return
|
||||||
|
@ -190,6 +208,8 @@ class JsonResource(HttpServer, resource.Resource):
|
||||||
request)
|
request)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
outgoing_responses_counter.inc(request.method, str(code))
|
||||||
|
|
||||||
# TODO: Only enable CORS for the requests that need it.
|
# TODO: Only enable CORS for the requests that need it.
|
||||||
respond_with_json(
|
respond_with_json(
|
||||||
request, code, response_json_object,
|
request, code, response_json_object,
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# Because otherwise 'resource' collides with synapse.metrics.resource
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from resource import getrusage, getpagesize, RUSAGE_SELF
|
||||||
|
|
||||||
|
from .metric import (
|
||||||
|
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# We'll keep all the available metrics in a single toplevel dict, one shared
|
||||||
|
# for the entire process. We don't currently support per-HomeServer instances
|
||||||
|
# of metrics, because in practice any one python VM will host only one
|
||||||
|
# HomeServer anyway. This makes a lot of implementation neater
|
||||||
|
all_metrics = {}
|
||||||
|
|
||||||
|
|
||||||
|
class Metrics(object):
|
||||||
|
""" A single Metrics object gives a (mutable) slice view of the all_metrics
|
||||||
|
dict, allowing callers to easily register new metrics that are namespaced
|
||||||
|
nicely."""
|
||||||
|
|
||||||
|
def __init__(self, name):
|
||||||
|
self.name_prefix = name
|
||||||
|
|
||||||
|
def _register(self, metric_class, name, *args, **kwargs):
|
||||||
|
full_name = "%s_%s" % (self.name_prefix, name)
|
||||||
|
|
||||||
|
metric = metric_class(full_name, *args, **kwargs)
|
||||||
|
|
||||||
|
all_metrics[full_name] = metric
|
||||||
|
return metric
|
||||||
|
|
||||||
|
def register_counter(self, *args, **kwargs):
|
||||||
|
return self._register(CounterMetric, *args, **kwargs)
|
||||||
|
|
||||||
|
def register_callback(self, *args, **kwargs):
|
||||||
|
return self._register(CallbackMetric, *args, **kwargs)
|
||||||
|
|
||||||
|
def register_distribution(self, *args, **kwargs):
|
||||||
|
return self._register(DistributionMetric, *args, **kwargs)
|
||||||
|
|
||||||
|
def register_cache(self, *args, **kwargs):
|
||||||
|
return self._register(CacheMetric, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def get_metrics_for(pkg_name):
|
||||||
|
""" Returns a Metrics instance for conveniently creating metrics
|
||||||
|
namespaced with the given name prefix. """
|
||||||
|
|
||||||
|
# Convert a "package.name" to "package_name" because Prometheus doesn't
|
||||||
|
# let us use . in metric names
|
||||||
|
return Metrics(pkg_name.replace(".", "_"))
|
||||||
|
|
||||||
|
|
||||||
|
def render_all():
|
||||||
|
strs = []
|
||||||
|
|
||||||
|
# TODO(paul): Internal hack
|
||||||
|
update_resource_metrics()
|
||||||
|
|
||||||
|
for name in sorted(all_metrics.keys()):
|
||||||
|
try:
|
||||||
|
strs += all_metrics[name].render()
|
||||||
|
except Exception:
|
||||||
|
strs += ["# FAILED to render %s" % name]
|
||||||
|
logger.exception("Failed to render %s metric", name)
|
||||||
|
|
||||||
|
strs.append("") # to generate a final CRLF
|
||||||
|
|
||||||
|
return "\n".join(strs)
|
||||||
|
|
||||||
|
|
||||||
|
# Now register some standard process-wide state metrics, to give indications of
|
||||||
|
# process resource usage
|
||||||
|
|
||||||
|
rusage = None
|
||||||
|
PAGE_SIZE = getpagesize()
|
||||||
|
|
||||||
|
|
||||||
|
def update_resource_metrics():
|
||||||
|
global rusage
|
||||||
|
rusage = getrusage(RUSAGE_SELF)
|
||||||
|
|
||||||
|
resource_metrics = get_metrics_for("process.resource")
|
||||||
|
|
||||||
|
# msecs
|
||||||
|
resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000)
|
||||||
|
resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
|
||||||
|
|
||||||
|
# pages
|
||||||
|
resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE)
|
|
@ -0,0 +1,155 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from itertools import chain
|
||||||
|
|
||||||
|
|
||||||
|
# TODO(paul): I can't believe Python doesn't have one of these
|
||||||
|
def map_concat(func, items):
|
||||||
|
# flatten a list-of-lists
|
||||||
|
return list(chain.from_iterable(map(func, items)))
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMetric(object):
|
||||||
|
|
||||||
|
def __init__(self, name, labels=[]):
|
||||||
|
self.name = name
|
||||||
|
self.labels = labels # OK not to clone as we never write it
|
||||||
|
|
||||||
|
def dimension(self):
|
||||||
|
return len(self.labels)
|
||||||
|
|
||||||
|
def is_scalar(self):
|
||||||
|
return not len(self.labels)
|
||||||
|
|
||||||
|
def _render_labelvalue(self, value):
|
||||||
|
# TODO: some kind of value escape
|
||||||
|
return '"%s"' % (value)
|
||||||
|
|
||||||
|
def _render_key(self, values):
|
||||||
|
if self.is_scalar():
|
||||||
|
return ""
|
||||||
|
return "{%s}" % (
|
||||||
|
",".join(["%s=%s" % (k, self._render_labelvalue(v))
|
||||||
|
for k, v in zip(self.labels, values)])
|
||||||
|
)
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
return map_concat(self.render_item, sorted(self.counts.keys()))
|
||||||
|
|
||||||
|
|
||||||
|
class CounterMetric(BaseMetric):
|
||||||
|
"""The simplest kind of metric; one that stores a monotonically-increasing
|
||||||
|
integer that counts events."""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(CounterMetric, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self.counts = {}
|
||||||
|
|
||||||
|
# Scalar metrics are never empty
|
||||||
|
if self.is_scalar():
|
||||||
|
self.counts[()] = 0
|
||||||
|
|
||||||
|
def inc_by(self, incr, *values):
|
||||||
|
if len(values) != self.dimension():
|
||||||
|
raise ValueError(
|
||||||
|
"Expected as many values to inc() as labels (%d)" % (self.dimension())
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: should assert that the tag values are all strings
|
||||||
|
|
||||||
|
if values not in self.counts:
|
||||||
|
self.counts[values] = incr
|
||||||
|
else:
|
||||||
|
self.counts[values] += incr
|
||||||
|
|
||||||
|
def inc(self, *values):
|
||||||
|
self.inc_by(1, *values)
|
||||||
|
|
||||||
|
def render_item(self, k):
|
||||||
|
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
|
||||||
|
|
||||||
|
|
||||||
|
class CallbackMetric(BaseMetric):
|
||||||
|
"""A metric that returns the numeric value returned by a callback whenever
|
||||||
|
it is rendered. Typically this is used to implement gauges that yield the
|
||||||
|
size or other state of some in-memory object by actively querying it."""
|
||||||
|
|
||||||
|
def __init__(self, name, callback, labels=[]):
|
||||||
|
super(CallbackMetric, self).__init__(name, labels=labels)
|
||||||
|
|
||||||
|
self.callback = callback
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
value = self.callback()
|
||||||
|
|
||||||
|
if self.is_scalar():
|
||||||
|
return ["%s %d" % (self.name, value)]
|
||||||
|
|
||||||
|
return ["%s%s %d" % (self.name, self._render_key(k), value[k])
|
||||||
|
for k in sorted(value.keys())]
|
||||||
|
|
||||||
|
|
||||||
|
class DistributionMetric(object):
|
||||||
|
"""A combination of an event counter and an accumulator, which counts
|
||||||
|
both the number of events and accumulates the total value. Typically this
|
||||||
|
could be used to keep track of method-running times, or other distributions
|
||||||
|
of values that occur in discrete occurances.
|
||||||
|
|
||||||
|
TODO(paul): Try to export some heatmap-style stats?
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name, *args, **kwargs):
|
||||||
|
self.counts = CounterMetric(name + ":count", **kwargs)
|
||||||
|
self.totals = CounterMetric(name + ":total", **kwargs)
|
||||||
|
|
||||||
|
def inc_by(self, inc, *values):
|
||||||
|
self.counts.inc(*values)
|
||||||
|
self.totals.inc_by(inc, *values)
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
return self.counts.render() + self.totals.render()
|
||||||
|
|
||||||
|
|
||||||
|
class CacheMetric(object):
|
||||||
|
"""A combination of two CounterMetrics, one to count cache hits and one to
|
||||||
|
count a total, and a callback metric to yield the current size.
|
||||||
|
|
||||||
|
This metric generates standard metric name pairs, so that monitoring rules
|
||||||
|
can easily be applied to measure hit ratio."""
|
||||||
|
|
||||||
|
def __init__(self, name, size_callback, labels=[]):
|
||||||
|
self.name = name
|
||||||
|
|
||||||
|
self.hits = CounterMetric(name + ":hits", labels=labels)
|
||||||
|
self.total = CounterMetric(name + ":total", labels=labels)
|
||||||
|
|
||||||
|
self.size = CallbackMetric(
|
||||||
|
name + ":size",
|
||||||
|
callback=size_callback,
|
||||||
|
labels=labels,
|
||||||
|
)
|
||||||
|
|
||||||
|
def inc_hits(self, *values):
|
||||||
|
self.hits.inc(*values)
|
||||||
|
self.total.inc(*values)
|
||||||
|
|
||||||
|
def inc_misses(self, *values):
|
||||||
|
self.total.inc(*values)
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
return self.hits.render() + self.total.render() + self.size.render()
|
|
@ -0,0 +1,39 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
|
|
||||||
|
METRICS_PREFIX = "/_synapse/metrics"
|
||||||
|
|
||||||
|
|
||||||
|
class MetricsResource(Resource):
|
||||||
|
isLeaf = True
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
Resource.__init__(self) # Resource is old-style, so no super()
|
||||||
|
|
||||||
|
self.hs = hs
|
||||||
|
|
||||||
|
def render_GET(self, request):
|
||||||
|
response = synapse.metrics.render_all()
|
||||||
|
|
||||||
|
request.setHeader("Content-Type", "text/plain")
|
||||||
|
request.setHeader("Content-Length", str(len(response)))
|
||||||
|
|
||||||
|
# Encode as UTF-8 (default)
|
||||||
|
return response.encode()
|
|
@ -19,12 +19,27 @@ from synapse.util.logutils import log_function
|
||||||
from synapse.util.logcontext import PreserveLoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext
|
||||||
from synapse.util.async import run_on_reactor
|
from synapse.util.async import run_on_reactor
|
||||||
from synapse.types import StreamToken
|
from synapse.types import StreamToken
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
notified_events_counter = metrics.register_counter("notified_events")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO(paul): Should be shared somewhere
|
||||||
|
def count(func, l):
|
||||||
|
"""Return the number of items in l for which func returns true."""
|
||||||
|
n = 0
|
||||||
|
for x in l:
|
||||||
|
if func(x):
|
||||||
|
n += 1
|
||||||
|
return n
|
||||||
|
|
||||||
|
|
||||||
class _NotificationListener(object):
|
class _NotificationListener(object):
|
||||||
""" This represents a single client connection to the events stream.
|
""" This represents a single client connection to the events stream.
|
||||||
|
@ -59,6 +74,7 @@ class _NotificationListener(object):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.deferred.callback(result)
|
self.deferred.callback(result)
|
||||||
|
notified_events_counter.inc_by(len(events))
|
||||||
except defer.AlreadyCalledError:
|
except defer.AlreadyCalledError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -95,6 +111,35 @@ class Notifier(object):
|
||||||
"user_joined_room", self._user_joined_room
|
"user_joined_room", self._user_joined_room
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# This is not a very cheap test to perform, but it's only executed
|
||||||
|
# when rendering the metrics page, which is likely once per minute at
|
||||||
|
# most when scraping it.
|
||||||
|
def count_listeners():
|
||||||
|
all_listeners = set()
|
||||||
|
|
||||||
|
for x in self.room_to_listeners.values():
|
||||||
|
all_listeners |= x
|
||||||
|
for x in self.user_to_listeners.values():
|
||||||
|
all_listeners |= x
|
||||||
|
for x in self.appservice_to_listeners.values():
|
||||||
|
all_listeners |= x
|
||||||
|
|
||||||
|
return len(all_listeners)
|
||||||
|
metrics.register_callback("listeners", count_listeners)
|
||||||
|
|
||||||
|
metrics.register_callback(
|
||||||
|
"rooms",
|
||||||
|
lambda: count(bool, self.room_to_listeners.values()),
|
||||||
|
)
|
||||||
|
metrics.register_callback(
|
||||||
|
"users",
|
||||||
|
lambda: count(bool, self.user_to_listeners.values()),
|
||||||
|
)
|
||||||
|
metrics.register_callback(
|
||||||
|
"appservices",
|
||||||
|
lambda: count(bool, self.appservice_to_listeners.values()),
|
||||||
|
)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_new_room_event(self, event, extra_users=[]):
|
def on_new_room_event(self, event, extra_users=[]):
|
||||||
|
|
|
@ -56,6 +56,7 @@ class BaseHomeServer(object):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
DEPENDENCIES = [
|
DEPENDENCIES = [
|
||||||
|
'config',
|
||||||
'clock',
|
'clock',
|
||||||
'http_client',
|
'http_client',
|
||||||
'db_name',
|
'db_name',
|
||||||
|
@ -79,6 +80,7 @@ class BaseHomeServer(object):
|
||||||
'resource_for_server_key',
|
'resource_for_server_key',
|
||||||
'resource_for_media_repository',
|
'resource_for_media_repository',
|
||||||
'resource_for_app_services',
|
'resource_for_app_services',
|
||||||
|
'resource_for_metrics',
|
||||||
'event_sources',
|
'event_sources',
|
||||||
'ratelimiter',
|
'ratelimiter',
|
||||||
'keyring',
|
'keyring',
|
||||||
|
|
|
@ -20,6 +20,7 @@ from synapse.events.utils import prune_event
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
|
||||||
from synapse.util.lrucache import LruCache
|
from synapse.util.lrucache import LruCache
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
@ -35,9 +36,22 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
|
||||||
transaction_logger = logging.getLogger("synapse.storage.txn")
|
transaction_logger = logging.getLogger("synapse.storage.txn")
|
||||||
|
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for("synapse.storage")
|
||||||
|
|
||||||
|
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
|
||||||
|
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
|
||||||
|
sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"])
|
||||||
|
|
||||||
|
caches_by_name = {}
|
||||||
|
cache_counter = metrics.register_cache(
|
||||||
|
"cache",
|
||||||
|
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
|
||||||
|
labels=["name"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO(paul):
|
# TODO(paul):
|
||||||
# * more generic key management
|
# * more generic key management
|
||||||
# * export monitoring stats
|
|
||||||
# * consider other eviction strategies - LRU?
|
# * consider other eviction strategies - LRU?
|
||||||
def cached(max_entries=1000):
|
def cached(max_entries=1000):
|
||||||
""" A method decorator that applies a memoizing cache around the function.
|
""" A method decorator that applies a memoizing cache around the function.
|
||||||
|
@ -55,6 +69,9 @@ def cached(max_entries=1000):
|
||||||
"""
|
"""
|
||||||
def wrap(orig):
|
def wrap(orig):
|
||||||
cache = OrderedDict()
|
cache = OrderedDict()
|
||||||
|
name = orig.__name__
|
||||||
|
|
||||||
|
caches_by_name[name] = cache
|
||||||
|
|
||||||
def prefill(key, value):
|
def prefill(key, value):
|
||||||
while len(cache) > max_entries:
|
while len(cache) > max_entries:
|
||||||
|
@ -65,8 +82,10 @@ def cached(max_entries=1000):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def wrapped(self, key):
|
def wrapped(self, key):
|
||||||
if key in cache:
|
if key in cache:
|
||||||
|
cache_counter.inc_hits(name)
|
||||||
defer.returnValue(cache[key])
|
defer.returnValue(cache[key])
|
||||||
|
|
||||||
|
cache_counter.inc_misses(name)
|
||||||
ret = yield orig(self, key)
|
ret = yield orig(self, key)
|
||||||
prefill(key, ret)
|
prefill(key, ret)
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
@ -83,7 +102,8 @@ def cached(max_entries=1000):
|
||||||
|
|
||||||
class LoggingTransaction(object):
|
class LoggingTransaction(object):
|
||||||
"""An object that almost-transparently proxies for the 'txn' object
|
"""An object that almost-transparently proxies for the 'txn' object
|
||||||
passed to the constructor. Adds logging to the .execute() method."""
|
passed to the constructor. Adds logging and metrics to the .execute()
|
||||||
|
method."""
|
||||||
__slots__ = ["txn", "name"]
|
__slots__ = ["txn", "name"]
|
||||||
|
|
||||||
def __init__(self, txn, name):
|
def __init__(self, txn, name):
|
||||||
|
@ -99,6 +119,7 @@ class LoggingTransaction(object):
|
||||||
def execute(self, sql, *args, **kwargs):
|
def execute(self, sql, *args, **kwargs):
|
||||||
# TODO(paul): Maybe use 'info' and 'debug' for values?
|
# TODO(paul): Maybe use 'info' and 'debug' for values?
|
||||||
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
|
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if args and args[0]:
|
if args and args[0]:
|
||||||
values = args[0]
|
values = args[0]
|
||||||
|
@ -120,8 +141,9 @@ class LoggingTransaction(object):
|
||||||
logger.exception("[SQL FAIL] {%s}", self.name)
|
logger.exception("[SQL FAIL] {%s}", self.name)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
end = time.time() * 1000
|
msecs = (time.time() * 1000) - start
|
||||||
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
|
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
|
||||||
|
sql_query_timer.inc_by(msecs, sql.split()[0])
|
||||||
|
|
||||||
|
|
||||||
class PerformanceCounters(object):
|
class PerformanceCounters(object):
|
||||||
|
@ -172,11 +194,18 @@ class SQLBaseStore(object):
|
||||||
self._previous_txn_total_time = 0
|
self._previous_txn_total_time = 0
|
||||||
self._current_txn_total_time = 0
|
self._current_txn_total_time = 0
|
||||||
self._previous_loop_ts = 0
|
self._previous_loop_ts = 0
|
||||||
|
|
||||||
|
# TODO(paul): These can eventually be removed once the metrics code
|
||||||
|
# is running in mainline, and we have some nice monitoring frontends
|
||||||
|
# to watch it
|
||||||
self._txn_perf_counters = PerformanceCounters()
|
self._txn_perf_counters = PerformanceCounters()
|
||||||
self._get_event_counters = PerformanceCounters()
|
self._get_event_counters = PerformanceCounters()
|
||||||
|
|
||||||
self._get_event_cache = LruCache(hs.config.event_cache_size)
|
self._get_event_cache = LruCache(hs.config.event_cache_size)
|
||||||
|
|
||||||
|
# Pretend the getEventCache is just another named cache
|
||||||
|
caches_by_name["*getEvent*"] = self._get_event_cache
|
||||||
|
|
||||||
def start_profiling(self):
|
def start_profiling(self):
|
||||||
self._previous_loop_ts = self._clock.time_msec()
|
self._previous_loop_ts = self._clock.time_msec()
|
||||||
|
|
||||||
|
@ -231,13 +260,13 @@ class SQLBaseStore(object):
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
end = time.time() * 1000
|
end = time.time() * 1000
|
||||||
transaction_logger.debug(
|
duration = end - start
|
||||||
"[TXN END] {%s} %f",
|
|
||||||
name, end - start
|
|
||||||
)
|
|
||||||
|
|
||||||
self._current_txn_total_time += end - start
|
transaction_logger.debug("[TXN END] {%s} %f", name, duration)
|
||||||
|
|
||||||
|
self._current_txn_total_time += duration
|
||||||
self._txn_perf_counters.update(desc, start, end)
|
self._txn_perf_counters.update(desc, start, end)
|
||||||
|
sql_txn_timer.inc_by(duration, desc)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
result = yield self._db_pool.runInteraction(
|
result = yield self._db_pool.runInteraction(
|
||||||
|
@ -638,14 +667,22 @@ class SQLBaseStore(object):
|
||||||
get_prev_content=False, allow_rejected=False):
|
get_prev_content=False, allow_rejected=False):
|
||||||
|
|
||||||
start_time = time.time() * 1000
|
start_time = time.time() * 1000
|
||||||
update_counter = self._get_event_counters.update
|
|
||||||
|
def update_counter(desc, last_time):
|
||||||
|
curr_time = self._get_event_counters.update(desc, last_time)
|
||||||
|
sql_getevents_timer.inc_by(curr_time - last_time, desc)
|
||||||
|
return curr_time
|
||||||
|
|
||||||
cache = self._get_event_cache.setdefault(event_id, {})
|
cache = self._get_event_cache.setdefault(event_id, {})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Separate cache entries for each way to invoke _get_event_txn
|
# Separate cache entries for each way to invoke _get_event_txn
|
||||||
return cache[(check_redacted, get_prev_content, allow_rejected)]
|
ret = cache[(check_redacted, get_prev_content, allow_rejected)]
|
||||||
|
|
||||||
|
cache_counter.inc_hits("*getEvent*")
|
||||||
|
return ret
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
cache_counter.inc_misses("*getEvent*")
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
start_time = update_counter("event_cache", start_time)
|
start_time = update_counter("event_cache", start_time)
|
||||||
|
@ -685,7 +722,11 @@ class SQLBaseStore(object):
|
||||||
check_redacted=True, get_prev_content=False):
|
check_redacted=True, get_prev_content=False):
|
||||||
|
|
||||||
start_time = time.time() * 1000
|
start_time = time.time() * 1000
|
||||||
update_counter = self._get_event_counters.update
|
|
||||||
|
def update_counter(desc, last_time):
|
||||||
|
curr_time = self._get_event_counters.update(desc, last_time)
|
||||||
|
sql_getevents_timer.inc_by(curr_time - last_time, desc)
|
||||||
|
return curr_time
|
||||||
|
|
||||||
d = json.loads(js)
|
d = json.loads(js)
|
||||||
start_time = update_counter("decode_json", start_time)
|
start_time = update_counter("decode_json", start_time)
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
|
|
||||||
class LruCache(object):
|
class LruCache(object):
|
||||||
"""Least-recently-used cache."""
|
"""Least-recently-used cache."""
|
||||||
# TODO(mjark) Add hit/miss counters
|
|
||||||
# TODO(mjark) Add mutex for linked list for thread safety.
|
# TODO(mjark) Add mutex for linked list for thread safety.
|
||||||
def __init__(self, max_size):
|
def __init__(self, max_size):
|
||||||
cache = {}
|
cache = {}
|
||||||
|
|
|
@ -0,0 +1,161 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2015 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
|
||||||
|
from synapse.metrics.metric import (
|
||||||
|
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CounterMetricTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_scalar(self):
|
||||||
|
counter = CounterMetric("scalar")
|
||||||
|
|
||||||
|
self.assertEquals(counter.render(), [
|
||||||
|
'scalar 0',
|
||||||
|
])
|
||||||
|
|
||||||
|
counter.inc()
|
||||||
|
|
||||||
|
self.assertEquals(counter.render(), [
|
||||||
|
'scalar 1',
|
||||||
|
])
|
||||||
|
|
||||||
|
counter.inc_by(2)
|
||||||
|
|
||||||
|
self.assertEquals(counter.render(), [
|
||||||
|
'scalar 3'
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_vector(self):
|
||||||
|
counter = CounterMetric("vector", labels=["method"])
|
||||||
|
|
||||||
|
# Empty counter doesn't yet know what values it has
|
||||||
|
self.assertEquals(counter.render(), [])
|
||||||
|
|
||||||
|
counter.inc("GET")
|
||||||
|
|
||||||
|
self.assertEquals(counter.render(), [
|
||||||
|
'vector{method="GET"} 1',
|
||||||
|
])
|
||||||
|
|
||||||
|
counter.inc("GET")
|
||||||
|
counter.inc("PUT")
|
||||||
|
|
||||||
|
self.assertEquals(counter.render(), [
|
||||||
|
'vector{method="GET"} 2',
|
||||||
|
'vector{method="PUT"} 1',
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
class CallbackMetricTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_scalar(self):
|
||||||
|
d = dict()
|
||||||
|
|
||||||
|
metric = CallbackMetric("size", lambda: len(d))
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'size 0',
|
||||||
|
])
|
||||||
|
|
||||||
|
d["key"] = "value"
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'size 1',
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_vector(self):
|
||||||
|
vals = dict()
|
||||||
|
|
||||||
|
metric = CallbackMetric("values", lambda: vals, labels=["type"])
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [])
|
||||||
|
|
||||||
|
# Keys have to be tuples, even if they're 1-element
|
||||||
|
vals[("foo",)] = 1
|
||||||
|
vals[("bar",)] = 2
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'values{type="bar"} 2',
|
||||||
|
'values{type="foo"} 1',
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
class DistributionMetricTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_scalar(self):
|
||||||
|
metric = DistributionMetric("thing")
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'thing:count 0',
|
||||||
|
'thing:total 0',
|
||||||
|
])
|
||||||
|
|
||||||
|
metric.inc_by(500)
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'thing:count 1',
|
||||||
|
'thing:total 500',
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_vector(self):
|
||||||
|
metric = DistributionMetric("queries", labels=["verb"])
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [])
|
||||||
|
|
||||||
|
metric.inc_by(300, "SELECT")
|
||||||
|
metric.inc_by(200, "SELECT")
|
||||||
|
metric.inc_by(800, "INSERT")
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'queries:count{verb="INSERT"} 1',
|
||||||
|
'queries:count{verb="SELECT"} 2',
|
||||||
|
'queries:total{verb="INSERT"} 800',
|
||||||
|
'queries:total{verb="SELECT"} 500',
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
class CacheMetricTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_cache(self):
|
||||||
|
d = dict()
|
||||||
|
|
||||||
|
metric = CacheMetric("cache", lambda: len(d))
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'cache:hits 0',
|
||||||
|
'cache:total 0',
|
||||||
|
'cache:size 0',
|
||||||
|
])
|
||||||
|
|
||||||
|
metric.inc_misses()
|
||||||
|
d["key"] = "value"
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'cache:hits 0',
|
||||||
|
'cache:total 1',
|
||||||
|
'cache:size 1',
|
||||||
|
])
|
||||||
|
|
||||||
|
metric.inc_hits()
|
||||||
|
|
||||||
|
self.assertEquals(metric.render(), [
|
||||||
|
'cache:hits 1',
|
||||||
|
'cache:total 2',
|
||||||
|
'cache:size 1',
|
||||||
|
])
|
Loading…
Reference in New Issue