Metrics are a means for workers to publish statistics about their operations for real-time plotting and later analysis. Vumi provides built-in support for publishing metric values to Carbon (the storage engine used by Graphite).

Using metrics from a worker

The set of metrics a worker wishes to plublish are managed via a MetricManager instance. The manager acts both as a container for the set of metrics and the publisher that pushes metric values out via AMQP.


class MyWorker(Worker):

    def startWorker(self, config):
        self.metrics = yield self.start_publisher(MetricManager,

In the example above a MetricManager publisher is started. All its metric names with be prefixed with myworker.. Two metrics are registered – a.value whose values will be averaged and a.count whose values will be summed. Later, the worker may set the metric values like so:

class vumi.blinkenlights.metrics.MetricManager(prefix, publish_interval=5, on_publish=None, publisher=None)

Utility for creating and monitoring a set of metrics.

  • prefix (str) – Prefix for the name of all metrics registered with this manager.
  • publish_interval (int in seconds) – How often to publish the set of metrics.
  • on_publish (f(metric_manager)) – Function to call immediately after metrics after published.
oneshot(metric, value)

Publish a single value for the given metric.

  • metric (Metric) – Metric object to register. Will have the manager’s prefix added to its name.
  • value (float) – The value to publish for the metric.

Publish all waiting metrics.


Register a new metric object to be managed by this metric set.

A metric can be registered with only one metric set.

Parameters:metric (Metric) – Metric object to register. The metric will have its .manage() method called with this manager as the manager.
Return type:For convenience, returns the metric passed in.

Start publishing metrics in a loop.


Start the metric polling and publishing task.


Stop publishing metrics.


Stop the metric polling and publishing task.


A Metric object publishes floating point values under a metric name. The name is created by combining the prefix from a metric manager with the suffix provided when the metric is constructed. A metric may only be registered with a single MetricManager.

When a metric value is set the value is stored in an internal list until the MetricManager polls the metric for values and publishes them.

A metric includes a list of aggregation functions to request that the metric aggregation workers apply (see later sections). Each metric class has a default list of aggregators but this may be overridden when a metric is created.

class vumi.blinkenlights.metrics.Metric(name, aggregators=None)

Simple metric.

Values set are collected and polled periodically by the metric manager.

  • name (str) – Name of this metric. Will be appened to the MetricManager prefix when this metric is published.
  • aggregators (list of aggregators, optional) – List of aggregation functions to request eventually be applied to this metric. The default is to average the value.


>>> mm = MetricManager('vumi.worker0.')
>>> my_val = mm.register(Metric('my.value'))
>>> my_val.set(1.5)
DEFAULT_AGGREGATORS = [<vumi.blinkenlights.metrics.Aggregator object at 0x7f53c17bb150>]

Default aggregators are [AVG]


Called by MetricManager when this metric is registered.


Called periodically by the MetricManager.


Append a value for later polling.

class vumi.blinkenlights.metrics.Count(name, aggregators=None)

Bases: vumi.blinkenlights.metrics.Metric

A simple counter.


>>> mm = MetricManager('vumi.worker0.')
>>> my_count = mm.register(Count('my.count'))
DEFAULT_AGGREGATORS = [<vumi.blinkenlights.metrics.Aggregator object at 0x7f53c17bb090>]

Default aggregators are [SUM]


Increment the count by 1.

class vumi.blinkenlights.metrics.Timer(*args, **kws)

Bases: vumi.blinkenlights.metrics.Metric

A metric that records time spent on operations.


>>> mm = MetricManager('vumi.worker0.')
>>> my_timer = mm.register(Timer(''))

Using the timer as a context manager:

>>> with my_timer.timeit():
>>>     process_data()

Using the timer without a context manager:

>>> event_timer = my_timer.timeit()
>>> event_timer.start()
>>> d = process_other_data()
>>> d.addCallback(lambda r: event_timer.stop())

Note that timers returned by timeit may only have start and stop called on them once (and only in that order).


Using .start() or .stop() directly or via using the Timer instance itself as a context manager is deprecated because they are not re-entrant and it’s easy to accidentally overlap multiple calls to .start() and .stop() on the same Timer instance (e.g. by letting the reactor run in between).

All applications should be updated to use .timeit().

Deprecated use of .start() and .stop():

>>> my_timer.start()
>>> try:
>>>     process_other_data()
>>> finally:
>>>     my_timer.stop()

Deprecated use of .start() and .stop() via using the Timer itself as a context manager:

>>> with my_timer:
>>>     process_more_data()
DEFAULT_AGGREGATORS = [<vumi.blinkenlights.metrics.Aggregator object at 0x7f53c17bb150>]

Default aggregators are [AVG]

Aggregation functions

Metrics declare which aggregation functions they wish to have applied but the actual aggregation is performed by aggregation workers. All values sent during an aggregation interval are aggregated into a single new value.

Aggregation fulfils two primary purposes:

  • To combine metrics from multiple workers into a single aggregated value (e.g. to determine the average time taken or total number of requests processed across multiple works).
  • To produce metric values at fixed time intervals (as is commonly required by metric storage backends such as Graphite and RRD Tool).

The aggregation functions currently available are:

  • SUM – returns the sum of the supplied values.
  • AVG – returns the arithmetic mean of the supplied values.
  • MIN – returns the minimum value.
  • MAX – returns the maximum value.

All aggregation functions return the value 0.0 if there are no values to aggregate.

New aggregators may be created by instantiating the Aggregator class.


The aggregator must be instantiated in both the process that generates the metric (usually a worker) and the process that performs the aggregation (usually an aggregation worker).

class vumi.blinkenlights.metrics.Aggregator(name, func)

Registry of aggregate functions for metrics.

  • name (str) – Short name for the aggregator.
  • func (f(list of values) -> float) – The aggregation function. Should return a default value if the list of values is empty (usually this default is 0.0).

Metrics aggregation system

The metric aggregation system consists of MetricTimeBucket and MetricAggregator workers.

The MetricTimeBucket workers pull metrics messages from the vumi.metrics exchange and publish them on the vumi.metrics.buckets exchange under a routing key specific to the MetricAggregator which should process them. Once sufficient time has passed for all metrics for a specific time period (a.k.a. time bucket) to have arrived at the aggregator, the requested aggregation functions are applied and the resulting aggregated metrics are published to the vumi.metrics.aggregates exchange.

A typical metric aggregation setup might consist of the following workers: * 2 MetricTimeBucket workers * 3 MetricAggregator workers * a final metric collector, e.g. GraphiteMetricsCollector.

A shell script to start-up such a setup might be:

BUCKET_OPTS="--worker_class=vumi.blinkenlights.MetricTimeBucket \
--set-option=buckets:3 --set-option=bucket_size:5"

AGGREGATOR_OPTS="--worker_class=vumi.blinkenlights.MetricAggregator \


twistd -n vumi_worker $BUCKET_OPTS &
twistd -n vumi_worker $BUCKET_OPTS &

twistd -n vumi_worker $AGGREGATOR_OPTS --set-option=bucket:0 &
twistd -n vumi_worker $AGGREGATOR_OPTS --set-option=bucket:1 &
twistd -n vumi_worker $AGGREGATOR_OPTS --set-option=bucket:2 &

twistd -n vumi_worker $GRAPHITE_OPTS &

Publishing to Graphite

The GraphiteMetricsCollector collects aggregate metrics (produced by the metrics aggregators) and publishes them to Carbon (Graphite’s metric collection package) over AMQP.

You can read about installing a configuring Graphite at but at the very least you will have to enable AMQP support by setting:


in Carbon’s configuration file.

If you have the metric aggregation system configured as in the section above you can start Carbon cache using: --config <config file> --debug start