Create Prometheus metrics from a dynamic source in Python

While the process for adding Prometheus metrics to a Python application is well documented in the prometheus_client documentation, dealing with adding metrics when you only know what the metric name or labels are going to be at runtime is trickier. Normal metric classes expect to be declared at module level so the default collector can pick them up. The documentation hints at a solution however. Use a Custom Collector.

The maintainer of the python client library has already done an excellent write-up on how to use custom collectors to take data from existing systems and create an exporter with them. The article (on extracting Jenkins job information) is here: https://www.robustperception.io/writing-a-jenkins-exporter-in-python

This article will describe how I took a Django application I wrote to store information on service level agreements, and expose component service window information as metrics to the application’s own metrics endpoint (Implemented with the excellent django-prometheus package).

Implementation

To add a Custom collector to a Django application, you will need to do three things:

  1. Have a model or models that supply data you want to turn into metrics.
  2. Write the collector class.
  3. Register the class with the prometheus client global registry ONCE ONLY, and make sure this happens AFTER the database has initialised, and only when the django app is actually running. This last part is probably the part that caused me the most grief.

Assuming you’ve already carried out step one, this is how you go about steps 2 and 3:

Step 2: Write the collector

A collector class is a class that implements the ‘collect’ method. The ‘collect’ method is a generator, that yields <type>MetricFamily objects, where <type> can be a Counter, GaugeHistogram, Gauge, Histogram, Info, StateSet, Summary, Unknown, or Untyped metric type.

Example (monitoring.py)

from prometheus_client.core import GaugeMetricFamily
from django.utils import timezone
from .models import Component


SERVICE_WINDOW_LAST_START_METRIC = 'service_window_last_start'
SERVICE_WINDOW_LAST_START_DOC = 'Last start time of the service window'
SERVICE_WINDOW_LAST_END_METRIC = 'service_window_last_end'
SERVICE_WINDOW_LAST_END_DOC = 'Last end time of the service window'
SERVICE_WINDOW_NEXT_START_METRIC = 'service_window_next_start'
SERVICE_WINDOW_NEXT_START_DOC = 'Next start time of the service window'
SERVICE_WINDOW_NEXT_END_METRIC = 'service_window_next_end'
SERVICE_WINDOW_NEXT_END_DOC = 'Next end time of the service window'
SERVICE_WINDOW_IN_WINDOW_METRIC = 'service_window_in_window'
SERVICE_WINDOW_IN_WINDOW_DOC = 'Is the service window active (1 for yes, 0 for no)'


class ComponentCollector(object):
    def collect(self):
        moment = timezone.now()
        components = Component.objects.all()
        metrics = {}

        for component in components:
            labels = component.get_labels()
            prefix = component.name.replace('-', '_') + "_"
            metrics[component.name] = {
                'last_start': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_LAST_START_METRIC)),
                                                SERVICE_WINDOW_LAST_START_DOC, labels=labels.keys()),

                'last_end': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_LAST_END_METRIC)),
                                              SERVICE_WINDOW_LAST_END_DOC, labels=labels.keys()),

                'next_start': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_NEXT_START_METRIC)),
                                                SERVICE_WINDOW_NEXT_START_DOC, labels=labels.keys()),

                'next_end': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_NEXT_END_METRIC)),
                                              SERVICE_WINDOW_NEXT_END_DOC, labels=labels.keys()),

                'in_window': GaugeMetricFamily(''.join( (prefix, SERVICE_WINDOW_IN_WINDOW_METRIC)),
                                               SERVICE_WINDOW_IN_WINDOW_DOC, labels=labels.keys()),
            }

            metrics[component.name]['last_start'].add_metric(labels=labels.values(),
                                                             value=component.get_last_start_time(moment).timestamp())
            metrics[component.name]['last_end'].add_metric(labels=labels.values(),
                                                           value=component.get_next_end_time(moment).timestamp())
            metrics[component.name]['next_start'].add_metric(labels=labels.values(),
                                                             value=component.get_next_start_time(moment).timestamp())
            metrics[component.name]['next_end'].add_metric(labels=labels.values(),
                                                           value=component.get_next_end_time(moment).timestamp())
            metrics[component.name]['in_window'].add_metric(labels=labels.values(),
                                                            value=int(component.in_window(moment)))
        for comp in metrics.keys():
            for metric in metrics[comp].values():
                yield metric

In this example, I’ve taken a Component model, that exposes the service window last and next start & end times, plus indicates if the current time is in a service window for the component. The metrics:

  • <component_name>_service_window_last_start
  • <component_name>_service_window_last_end
  • <component_name>_service_window_next_start
  • <component_name>_service_window_next_end
  • <component_name>_service_window_in_window

are created, and the labels added to the component are added as metric labels to the metrics.

The <type>MetricFamily class does the rest of the work. The default prometheus registry class will run the collect once to store the metric definitions, then run collect to obtain updated metric values on each scrape.

Step 3: Registering the collector

This involves some Django trickery in the app.py module of your project.

You will need to do the following:

  1. Write a migration hook to register if you are running a migration instead of the actual application.
  2. Write another hook to register when you’ve connected to the database.
  3. Register both hooks in the AppConfig ready method.
  4. Register your Collector class with the prometheus registry the first time the database connection hook fires ONLY.

Example (apps.py)

from django.apps import AppConfig

from django.db.models.signals import post_migrate
from django.db.backends.signals import connection_created
from prometheus_client import REGISTRY


migration_executed = False
monitoring_initialised = False


def post_migration_callback(sender, **kwargs):
    global migration_executed
    logger.info('Migration executed')
    migration_executed = True


def connection_callback(sender, connection, **kwargs):
    global monitoring_initialised
    # Check to see if we are not running a unittest temp db
    if not connection.settings_dict['NAME'] == 'file:memorydb_default?mode=memory&cache=shared':
        if not monitoring_initialised:
            from .monitoring import ComponentCollector
            REGISTRY.register(ComponentCollector())
            monitoring_initialised = True


class ComponentSlaMonitorConfig(AppConfig):
    name = 'component_sla_monitor'

    def ready(self):
        global migration_executed
        post_migrate.connect(post_migration_callback, sender=self)

        if not migration_executed:
            connection_created.connect(connection_callback)

Note that we only import the Collector in the connection_callback hook. This is because importing at the top of the file will cause django database errors.

Also, note the check to see if the DB connection is with an in-memory database. This is to disable monitoring registration during unit tests.

This code is based on Django 2.2. The ready method, and some of the hooks have only been available since Django 1.7

Prometheus: Adding a label to a target

Prometheus relabel configs are notoriously badly documented, so here’s how to do something simple that I couldn’t find documented anywhere: How to add a label to all metrics coming from a specific scrape target.

Example

scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']
    # Add your relabel config under the scrape configs
    relabel_configs:
        # source label must be one that exists, so use __address__
      - source_labels: [__address__]
        # target label is the one you want to create
        target_label: my_new_label
        replacement: "my-new-label-value"

And there you have it.

This will create a new label “my_new_label” with the fixed value “my-new-label-value“.

How does it work?

If you don’t supply them, the default settings for a relabel_config are:

  • action: replace
  • regex: (.*)
  • separator: ;

By choosing a single always existing source label (__address__ always exists), you are guaranteed to get a source match for replacing the target_label with. The default regex wil always match, which causes the replacement to be carried out. However, we’re not specifying any match group’s in our replacement string, so the entire string is just copied into target_label. This is just a very specific case of how you can use a relabel_config to copy (parts of) a label into another (new) label.

Using Structlog with Gunicorn

Structlog is an awesome tool for outputting useful log information that can easily be picked up by central logging tools like an ELK stack. Setting it to output rich logging events + context to stdout in json format means you are a long way towards implementing the ideals of 12 factor app logging.

However, the first google hit on “structlog gunicorn” is a FAQ from structlog version 16 that implies that you should probably use another library to output gunicorn logs in json.

Since I found this to be more than a little disappointing, I did some more digging, and discovered that setting up gunicorn to use structlog is relatively simple, and with a small addition, is completely awesome.

Configuring structlog

The key to configuring structlog in gunicorn is that when running gunicorn, it will first look for a file called gunicorn.conf.py and execute the contents of that. In there, you can set all of gunicorns runtime settings, but it also allows you to run python code. The key setting here is:

logconfig_dict

You can set this to configure all aspects of logging, including they key setting: the formatters. Creating a json_formatter entry can allow you to hook structlog into the stdlib logger in the same way as the developer describes on this page in the section “Rendering Using structlog-based Formatters Within logging“. The “foreign_pre_chain” entry also allows you to chain structlog preprocessors to do interesting things to your log entry (more on that later). Here’s the code you need to add to gunicorn.conf.py

import structlog

# --- Structlog logging initialisation code
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
pre_chain = [
    # Add the log level and a timestamp to the event_dict if the log entry
    # is not from structlog.
    structlog.stdlib.add_log_level,
    structlog.stdlib.add_logger_name,
    timestamper
]

logconfig_dict = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "json_formatter": {
            "()": structlog.stdlib.ProcessorFormatter,
            "processor": structlog.processors.JSONRenderer(),
            "foreign_pre_chain": pre_chain,
        }
    },
    "handlers": {
        "error_console": {
            "class": "logging.StreamHandler",
            "formatter": "json_formatter",
        },
        "console": {
            "class": "logging.StreamHandler",
            "formatter": "json_formatter",
        }
    },
}

Adding this block to your gunicorn.conf.py file will start it emitting all of its log entries in json, adding level, logger and timestamps fields to each entry.

Notes

By trial and error, I discovered that gunicorn used 2 log handlers.

  • console
  • error_console

Overriding these in the logconfig_dict is what makes this method work.

access_logs as structured data

If you look at the output from gunicorn on a running server, you will see a lot of events in the json logs that contain what looks like apache combined log format (because that’s exactly what it is). Wouldn’t it be nice to split that wonderful contextual information into separate fields so you don’t have to do that later in your logging platform?

Structlog makes that easy: Just write a processor function.

Processor functions have the following signature:

def some_processor_func(logger,log_method,event_dict):
    # Do something with the event_dict here
    ...
    return event_dict

You can add them to the processing chain in the same way that the timestamper class has been added in the pre_chain list in the configuration above. For this, I borrowed some code from https://www.seehuhn.de/blog/52.html to parse combined log fields into a dictionary and added this to the event_dict like this:

# structlog_helper.py
import re

def combined_logformat(logger,name,event_dict):
    if event_dict.get('logger') == "gunicorn.access":
        message = event_dict['event']

        parts = [
            r'(?P<host>\S+)',  # host %h
            r'\S+',  # indent %l (unused)
            r'(?P<user>\S+)',  # user %u
            r'\[(?P<time>.+)\]',  # time %t
            r'"(?P<request>.+)"',  # request "%r"
            r'(?P<status>[0-9]+)',  # status %>s
            r'(?P<size>\S+)',  # size %b (careful, can be '-')
            r'"(?P<referer>.*)"',  # referer "%{Referer}i"
            r'"(?P<agent>.*)"',  # user agent "%{User-agent}i"
        ]
        pattern = re.compile(r'\s+'.join(parts) + r'\s*\Z')
        m = pattern.match(message)
        res = m.groupdict()

        if res["user"] == "-":
            res["user"] = None

        res["status"] = int(res["status"])

        if res["size"] == "-":
            res["size"] = 0
        else:
            res["size"] = int(res["size"])

        if res["referer"] == "-":
            res["referer"] = None

        event_dict.update(res)

    return event_dict

Note that the code checks the logger field in the dict to see if it is a gunicorn.access logger, and if so, it tries to parse the entries. To use this helper, you need to import this module, then add the function to the pre_chain list. Example below:

import structlog
from . import structlog_helper # or wherever you decide to put the file

# --- Structlog logging initialisation code
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
pre_chain = [
    # Add the log level and a timestamp to the event_dict if the log entry
    # is not from structlog.
    structlog.stdlib.add_log_level,
    structlog.stdlib.add_logger_name,
    timestamper,
    structlog_helper.combined_logformat # This does the magic!
]

# rest of config is identical to first version
...

So what do you get? Here’s the output from a demo project I’ve been writing for work. Sorry, I can’t publish the sources:

{"event": "Booting worker with pid: 30099", "level": "info", "logger": "gunicorn.error", "timestamp": "2019-08-15 14:19:14"}
{"event": "Booting worker with pid: 30100", "level": "info", "logger": "gunicorn.error", "timestamp": "2019-08-15 14:19:14"}
{"event": "10.96.78.6 - - [15/Aug/2019:14:19:23 +0000] \"GET /metrics HTTP/1.1\" 200 14134 \"-\" \"Prometheus/2.11.0\"", "level": "info", "logger": "gunicorn.access", "timestamp": "2019-08-15 14:19:23", "host": "10.96.78.6", "user": null, "time": "15/Aug/2019:14:19:23 +0000", "request": "GET /metrics HTTP/1.1", "status": 200, "size": 14134, "referer": null, "agent": "Prometheus/2.11.0"}

As you can see, the access logs are output with the combined log fields split out into their own json entries, ready for parsing by a central logger listening to the container output.

Summary

Structlog is an awesome tool for writing applications that log machine-readable output with context information in fields instead of badly parsable strings. Using this configuration you can easily get gunicorn show context-rich output in json.

Prometheus Alertmanager cluster in Docker Swarm

Prometheus monitoring and Docker combine together really well, but configuring an Alertmanager cluster can be a bit of a challenge if you don’t find the trick. This article shows a method that both works, and isn’t overly complicated to set up.

The trick is, that while it isn’t possible to pass the cluster.peer parameters correctly to a single service entry, you can use 2 or more numbered service entries instead, and define a network alias to combine them into a DNS-searchable whole for your further configuration.

Docker compose configuration

This is a sample docker compose that can be instantiated in Docker swarm using docker stack deploy –compose-file …

# docker-compose.yml
version: '3.7'
services:
  alertmanager_1:
    image: prom/alertmanager:latest
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'
      - '--cluster.peer=tasks.alertmanager_2:9094'
    deploy:
      mode: global
      placement:
        constraints:
          - node.hostname == swarm-manager000000
    networks:
      prometheus_stack:
        aliases:
          - alertmanager
    ports:
        - '19093:9093'
    volumes:
      - alertmanager-data:/alertmanager
      - alertmanager-config:/etc/alertmanager
  alertmanager_2:
    image: prom/alertmanager:latest
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'
      - '--cluster.peer=tasks.alertmanager_1:9094'
    deploy:
      mode: global
      placement:
        constraints:
          - node.hostname == swarm-manager000001
    networks:
      prometheus_stack:
        aliases:
          - alertmanager
    ports:
        - '29093:9093'
    volumes:
      - alertmanager-data:/alertmanager
      - alertmanager-config:/etc/alertmanager
networks:
  prometheus_stack:
    driver: overlay
    attachable: true 
volumes:
  alertmanager-data: {}  
  alertmanager-config: {} 

Note the following aspects:

  • Each alertmanager gets a named service, locked to a single node via placement constraints (fill in your own node names here).
  • The cluster.peer setting refers to the service name of the other alertmanager service(s)
  • This configuration is ready to use my method for updating configuration using git push as described in my article on Dynamic Docker configuration management
  • Because we’ve defined a network alias on each alertmanager service, we can use DNS service discovery in our Prometheus config file to find the alertmanagers, using docker swarm’s task.<servicename> DNS entries.

prometheus.yml

...
# Alertmanager configuration
alerting:
  alertmanagers:
  - dns_sd_configs:
    - names:
      - 'tasks.alertmanager'
      type: 'A'
      port: 9093

Things to be aware of…

  • When running on a cluster, if you are using a volume for storing the alertmanager configuration, you should be using a shared storage volume driver. My own swarm is running on Docker for Azure, and uses the cloudstor:azure driver.
    • If you can’t do this, you’ll have to attach your config files using configs: blocks. For static configurations this is fine, but in an active environment, versioning your config names becomes a nuisance very fast.

Dynamic Docker config management

I’ve been working on building a Prometheus monitoring stack in Docker swarm, and I ran into an interesting challenge, namely, how to separate my prometheus configuration update process from my container deployment process. The solution I came up with is one that I think can be adapted for other applications with similar properties.

Configuration repository with polling

Prometheus, like many opensource devops tools, uses configuration files to manage it’s configuration. The challenge was to find a way of connecting these files to the prometheus docker container and also allow for the configuration to be updated from version control. Prometheus already has an external trigger to load an updated configuration:

# Load prometheus with web.enable-lifecycle to allow reload via HTTP POST
prometheus --web.enable-lifecycle [other startup flags...]

# Trigger configuration reload
curl -X POST http://prometheus:9090/-/reload

The trick is to do the following:

  • pass –web.enable-lifecycle as command line parameter to your prometheus container
  • mount an external volume to your prometheus containers as /etc/prometheus (or wherever you have configured prometheus to find it’s configuration.
  • use a second container that also mounts the configuration volume, and does the following:
    • runs an update script on a schedule that does the following:
      • pulls the latest version of the config
      • If there is an update, copy the config to the configuration volume
      • Sends a signal to prometheus to reload the configuration

Here is an example of the update script:

#!/bin/bash

CONFIGDIR=$PROMETHEUS_CONFIG_DIR/
WORKDIR=/root
REPODIR=${WORKDIR}/prom-config
REPO=$PROMETHEUS_CONFIG_REPO
NOTIFY_HOST=$PROMETHEUS_HOST_DNS
NOTIFY_PORT=$PROMETHEUS_PORT
NOTIFY_PATH=$PROMETHEUS_NOTIFY_PATH
NOTIFY_METHOD=POST

copy_config () {
  rsync -a --exclude='.*' $REPODIR/ $CONFIGDIR
  echo "config deployed"
}

notify_endpoints () {
  for IP in $(dig $NOTIFY_HOST +short); do curl -X $NOTIFY_METHOD $IP:$NOTIFY_PORT$NOTIFY_PATH; done
  echo "endpoints notified"
}

git_initial_clone () {
    git clone $REPO $REPODIR
    echo "initial clone"
}

git_no_repo () {
    git init
    git remote add origin $REPO
    git pull origin master --force
    echo "clone to existing non-repo dir"
}

if [[ ! -d $REPODIR ]]; then
    git_initial_clone
    copy_config
    notify_endpoints
else
    cd $REPODIR
    if [ ! -d .git ]; then
        git_no_repo
        copy_config
        notify_endpoints        
    else
    	git remote update
    	UPSTREAM=${1:-'@{u}'}
    	LOCAL=$(git rev-parse @)
    	REMOTE=$(git rev-parse "$UPSTREAM")
    	BASE=$(git merge-base @ "$UPSTREAM")

    	if [ $LOCAL = $REMOTE ]; then
            echo "Up-to-date"
    	elif [ $LOCAL = $BASE ]; then
        	git pull --force
        	echo "changes pulled"
        	copy_config
            notify_endpoints
    	fi
    fi
fi

And here is an example docker-compose file that puts it all together.

---
version: '3.7'
services:
  scheduler:
    image: nralbers/scheduler:latest
    configs:
      - source: prom_update_script
        target: /etc/periodic/1min/update_prometheus
        mode: 0555
      - source: ssh_config
        target: /root/.ssh/config
        mode: 0400
    environment:
      - PROMETHEUS_CONFIG_DIR=/etc/prometheus
      - PROMETHEUS_CONFIG_REPO= <your config git repo>
      - PROMETHEUS_HOST_DNS=tasks.prometheus
      - PROMETHEUS_PORT=9090
      - PROMETHEUS_NOTIFY_PATH=/-/reload
    secrets:
      - source: ssh_key
        target: id_rsa
        mode: 0400
    volumes:
    - prom-config:/etc/prometheus  

  prometheus:
    image: prom/prometheus:latest
    ports:
    - 9090:9090
    command:
    - '--config.file=/etc/prometheus/prometheus.yml'
    - '--web.enable-lifecycle'
    - '--storage.tsdb.path=/prometheus'
    - '--web.console.libraries=/usr/share/prometheus/console_libraries'
    - '--web.console.templates=/usr/share/prometheus/consoles'
    volumes:
    - prom_data:/prometheus
    - prom_config:/etc/prometheus
    depends_on:
    - configloader
    - cadvisor
  cadvisor:
    image: google/cadvisor:latest
    ports:
    - 8080:8080
    volumes:
    - /:/rootfs:ro
    - /var/run:/var/run:rw
    - /sys:/sys:ro
    - /var/lib/docker/:/var/lib/docker:ro
configs:
  ssh_config:
    file: ssh_config
  prom_update_script:
    file: update_prometheus.sh
secrets:
  ssh_key:
    file: ${HOME}/.ssh/id_rsa
volumes:
  prom-config: {}
  prom-data: {}

The scheduler image consists of an alpine image modified to add some new cron schedules, and to be able to run the update script. It is hosted in docker hub, source code is here:
https://github.com/nralbers/docker-scheduler

FROM alpine:latest
LABEL maintainer="nralbers@gmail.com"
LABEL version="1.0"
LABEL description="Image running crond with additional schedule options for /etc/periodic/1min and /etc/periodic/5min. \
The image has bash, bind-tools, git & openssh installed. To use: bind mount the scripts you want to schedule to /etc/periodic/<period>"
RUN apk update && apk add bash bind-tools openssh git curl rsync
RUN mkdir -p /etc/periodic/1min && echo "*       *       *       *       *       run-parts /etc/periodic/1min" >> /etc/crontabs/root
RUN mkdir -p /etc/periodic/5min && echo "*/5     *       *       *       *       run-parts /etc/periodic/5min" >> /etc/crontabs/root
ENTRYPOINT ["crond", "-f", "-d", "8"]

General applications

It should be clear that while this example is aimed at dockerised deployments of prometheus, this will also work in other situations as long as the application has the following properties:

  • The application uses configuration files for configuration
  • It has an external means of forcing a configuration reload
  • It is possible to share the configuration storage volume between the application container and the script that pulls the updated config from version control and triggers the reload

Next steps…

  • Add a mechanism to validate the config before pushing to the destination system. Ideally, this should also happen on push to the configuration repository.
  • Show how to run different versions of the config from the same repository using deployment branches for dev, acceptance and production
  • Mechanism to secure configuration in a repository. This could be achieved using ansible-vault to encrypt the configuration pre-checkin and to decrypt on pull. The encryption key can be attached to the scheduler container using a docker secret.