Create Prometheus metrics from a dynamic source in Python

While the process for adding Prometheus metrics to a Python application is well documented in the prometheus_client documentation, dealing with adding metrics when you only know what the metric name or labels are going to be at runtime is trickier. Normal metric classes expect to be declared at module level so the default collector can pick them up. The documentation hints at a solution however. Use a Custom Collector.

The maintainer of the python client library has already done an excellent write-up on how to use custom collectors to take data from existing systems and create an exporter with them. The article (on extracting Jenkins job information) is here: https://www.robustperception.io/writing-a-jenkins-exporter-in-python

This article will describe how I took a Django application I wrote to store information on service level agreements, and expose component service window information as metrics to the application’s own metrics endpoint (Implemented with the excellent django-prometheus package).

Implementation

To add a Custom collector to a Django application, you will need to do three things:

  1. Have a model or models that supply data you want to turn into metrics.
  2. Write the collector class.
  3. Register the class with the prometheus client global registry ONCE ONLY, and make sure this happens AFTER the database has initialised, and only when the django app is actually running. This last part is probably the part that caused me the most grief.

Assuming you’ve already carried out step one, this is how you go about steps 2 and 3:

Step 2: Write the collector

A collector class is a class that implements the ‘collect’ method. The ‘collect’ method is a generator, that yields <type>MetricFamily objects, where <type> can be a Counter, GaugeHistogram, Gauge, Histogram, Info, StateSet, Summary, Unknown, or Untyped metric type.

Example (monitoring.py)

from prometheus_client.core import GaugeMetricFamily
from django.utils import timezone
from .models import Component


SERVICE_WINDOW_LAST_START_METRIC = 'service_window_last_start'
SERVICE_WINDOW_LAST_START_DOC = 'Last start time of the service window'
SERVICE_WINDOW_LAST_END_METRIC = 'service_window_last_end'
SERVICE_WINDOW_LAST_END_DOC = 'Last end time of the service window'
SERVICE_WINDOW_NEXT_START_METRIC = 'service_window_next_start'
SERVICE_WINDOW_NEXT_START_DOC = 'Next start time of the service window'
SERVICE_WINDOW_NEXT_END_METRIC = 'service_window_next_end'
SERVICE_WINDOW_NEXT_END_DOC = 'Next end time of the service window'
SERVICE_WINDOW_IN_WINDOW_METRIC = 'service_window_in_window'
SERVICE_WINDOW_IN_WINDOW_DOC = 'Is the service window active (1 for yes, 0 for no)'


class ComponentCollector(object):
    def collect(self):
        moment = timezone.now()
        components = Component.objects.all()
        metrics = {}

        for component in components:
            labels = component.get_labels()
            prefix = component.name.replace('-', '_') + "_"
            metrics[component.name] = {
                'last_start': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_LAST_START_METRIC)),
                                                SERVICE_WINDOW_LAST_START_DOC, labels=labels.keys()),

                'last_end': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_LAST_END_METRIC)),
                                              SERVICE_WINDOW_LAST_END_DOC, labels=labels.keys()),

                'next_start': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_NEXT_START_METRIC)),
                                                SERVICE_WINDOW_NEXT_START_DOC, labels=labels.keys()),

                'next_end': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_NEXT_END_METRIC)),
                                              SERVICE_WINDOW_NEXT_END_DOC, labels=labels.keys()),

                'in_window': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_IN_WINDOW_METRIC)),
                                               SERVICE_WINDOW_IN_WINDOW_DOC, labels=labels.keys()),
            }

            metrics[component.name]['last_start'].add_metric(labels=labels.values(),
                                                             value=component.get_last_start_time(moment).timestamp())
            metrics[component.name]['last_end'].add_metric(labels=labels.values(),
                                                           value=component.get_next_end_time(moment).timestamp())
            metrics[component.name]['next_start'].add_metric(labels=labels.values(),
                                                             value=component.get_next_start_time(moment).timestamp())
            metrics[component.name]['next_end'].add_metric(labels=labels.values(),
                                                           value=component.get_next_end_time(moment).timestamp())
            metrics[component.name]['in_window'].add_metric(labels=labels.values(),
                                                            value=int(component.in_window(moment)))
        for comp in metrics.keys():
            for metric in metrics[comp].values():
                yield metric

In this example, I’ve taken a Component model, that exposes the service window last and next start & end times, plus indicates if the current time is in a service window for the component. The metrics:

  • <component_name>_service_window_last_start
  • <component_name>_service_window_last_end
  • <component_name>_service_window_next_start
  • <component_name>_service_window_next_end
  • <component_name>_service_window_in_window

are created, and the labels added to the component are added as metric labels to the metrics.

The <type>MetricFamily class does the rest of the work. The default prometheus registry class will run the collect once to store the metric definitions, then run collect to obtain updated metric values on each scrape.

Step 3: Registering the collector

This involves some Django trickery in the app.py module of your project.

You will need to do the following:

  1. Write a migration hook to register if you are running a migration instead of the actual application.
  2. Write another hook to register when you’ve connected to the database.
  3. Register both hooks in the AppConfig ready method.
  4. Register your Collector class with the prometheus registry the first time the database connection hook fires ONLY.

Example (apps.py)

from django.apps import AppConfig

from django.db.models.signals import post_migrate
from django.db.backends.signals import connection_created
from prometheus_client import REGISTRY


migration_executed = False
monitoring_initialised = False


def post_migration_callback(sender, **kwargs):
    global migration_executed
    logger.info('Migration executed')
    migration_executed = True


def connection_callback(sender, connection, **kwargs):
    global monitoring_initialised
    # Check to see if we are not running a unittest temp db
    if not connection.settings_dict['NAME'] == 'file:memorydb_default?mode=memory&cache=shared':
        if not monitoring_initialised:
            from .monitoring import ComponentCollector
            REGISTRY.register(ComponentCollector())
            monitoring_initialised = True


class ComponentSlaMonitorConfig(AppConfig):
    name = 'component_sla_monitor'

    def ready(self):
        global migration_executed
        post_migrate.connect(post_migration_callback, sender=self)

        if not migration_executed:
            connection_created.connect(connection_callback)

Note that we only import the Collector in the connection_callback hook. This is because importing at the top of the file will cause django database errors.

Also, note the check to see if the DB connection is with an in-memory database. This is to disable monitoring registration during unit tests.

This code is based on Django 2.2. The ready method, and some of the hooks have only been available since Django 1.7

Using Structlog with Gunicorn

Structlog is an awesome tool for outputting useful log information that can easily be picked up by central logging tools like an ELK stack. Setting it to output rich logging events + context to stdout in json format means you are a long way towards implementing the ideals of 12 factor app logging.

However, the first google hit on “structlog gunicorn” is a FAQ from structlog version 16 that implies that you should probably use another library to output gunicorn logs in json.

Since I found this to be more than a little disappointing, I did some more digging, and discovered that setting up gunicorn to use structlog is relatively simple, and with a small addition, is completely awesome.

Configuring structlog

The key to configuring structlog in gunicorn is that when running gunicorn, it will first look for a file called gunicorn.conf.py and execute the contents of that. In there, you can set all of gunicorns runtime settings, but it also allows you to run python code. The key setting here is:

logconfig_dict

You can set this to configure all aspects of logging, including they key setting: the formatters. Creating a json_formatter entry can allow you to hook structlog into the stdlib logger in the same way as the developer describes on this page in the section “Rendering Using structlog-based Formatters Within logging“. The “foreign_pre_chain” entry also allows you to chain structlog preprocessors to do interesting things to your log entry (more on that later). Here’s the code you need to add to gunicorn.conf.py

import structlog

# --- Structlog logging initialisation code
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
pre_chain = [
    # Add the log level and a timestamp to the event_dict if the log entry
    # is not from structlog.
    structlog.stdlib.add_log_level,
    structlog.stdlib.add_logger_name,
    timestamper
]

logconfig_dict = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "json_formatter": {
            "()": structlog.stdlib.ProcessorFormatter,
            "processor": structlog.processors.JSONRenderer(),
            "foreign_pre_chain": pre_chain,
        }
    },
    "handlers": {
        "error_console": {
            "class": "logging.StreamHandler",
            "formatter": "json_formatter",
        },
        "console": {
            "class": "logging.StreamHandler",
            "formatter": "json_formatter",
        }
    },
}

Adding this block to your gunicorn.conf.py file will start it emitting all of its log entries in json, adding level, logger and timestamps fields to each entry.

Notes

By trial and error, I discovered that gunicorn used 2 log handlers.

  • console
  • error_console

Overriding these in the logconfig_dict is what makes this method work.

access_logs as structured data

If you look at the output from gunicorn on a running server, you will see a lot of events in the json logs that contain what looks like apache combined log format (because that’s exactly what it is). Wouldn’t it be nice to split that wonderful contextual information into separate fields so you don’t have to do that later in your logging platform?

Structlog makes that easy: Just write a processor function.

Processor functions have the following signature:

def some_processor_func(logger,log_method,event_dict):
    # Do something with the event_dict here
    ...
    return event_dict

You can add them to the processing chain in the same way that the timestamper class has been added in the pre_chain list in the configuration above. For this, I borrowed some code from https://www.seehuhn.de/blog/52.html to parse combined log fields into a dictionary and added this to the event_dict like this:

# structlog_helper.py
import re

def combined_logformat(logger,name,event_dict):
    if event_dict.get('logger') == "gunicorn.access":
        message = event_dict['event']

        parts = [
            r'(?P<host>\S+)',  # host %h
            r'\S+',  # indent %l (unused)
            r'(?P<user>\S+)',  # user %u
            r'\[(?P<time>.+)\]',  # time %t
            r'"(?P<request>.+)"',  # request "%r"
            r'(?P<status>[0-9]+)',  # status %>s
            r'(?P<size>\S+)',  # size %b (careful, can be '-')
            r'"(?P<referer>.*)"',  # referer "%{Referer}i"
            r'"(?P<agent>.*)"',  # user agent "%{User-agent}i"
        ]
        pattern = re.compile(r'\s+'.join(parts) + r'\s*\Z')
        m = pattern.match(message)
        res = m.groupdict()

        if res["user"] == "-":
            res["user"] = None

        res["status"] = int(res["status"])

        if res["size"] == "-":
            res["size"] = 0
        else:
            res["size"] = int(res["size"])

        if res["referer"] == "-":
            res["referer"] = None

        event_dict.update(res)

    return event_dict

Note that the code checks the logger field in the dict to see if it is a gunicorn.access logger, and if so, it tries to parse the entries. To use this helper, you need to import this module, then add the function to the pre_chain list. Example below:

import structlog
from . import structlog_helper # or wherever you decide to put the file

# --- Structlog logging initialisation code
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
pre_chain = [
    # Add the log level and a timestamp to the event_dict if the log entry
    # is not from structlog.
    structlog.stdlib.add_log_level,
    structlog.stdlib.add_logger_name,
    timestamper,
    structlog_helper.combined_logformat # This does the magic!
]

# rest of config is identical to first version
...

So what do you get? Here’s the output from a demo project I’ve been writing for work. Sorry, I can’t publish the sources:

{"event": "Booting worker with pid: 30099", "level": "info", "logger": "gunicorn.error", "timestamp": "2019-08-15 14:19:14"}
{"event": "Booting worker with pid: 30100", "level": "info", "logger": "gunicorn.error", "timestamp": "2019-08-15 14:19:14"}
{"event": "10.96.78.6 - - [15/Aug/2019:14:19:23 +0000] \"GET /metrics HTTP/1.1\" 200 14134 \"-\" \"Prometheus/2.11.0\"", "level": "info", "logger": "gunicorn.access", "timestamp": "2019-08-15 14:19:23", "host": "10.96.78.6", "user": null, "time": "15/Aug/2019:14:19:23 +0000", "request": "GET /metrics HTTP/1.1", "status": 200, "size": 14134, "referer": null, "agent": "Prometheus/2.11.0"}

As you can see, the access logs are output with the combined log fields split out into their own json entries, ready for parsing by a central logger listening to the container output.

Summary

Structlog is an awesome tool for writing applications that log machine-readable output with context information in fields instead of badly parsable strings. Using this configuration you can easily get gunicorn show context-rich output in json.