Django and vault


When using dynamic database credentials with Django, we need to make sure that the django instance picks up the right credentials, renews them when necessary, and uses the right roles.

This post includes the background and the necessary code.

Migration and creation

Migration and creation provide special problems because of modification of database objects. For this, we either need to assume the role (as mentioned above) which owns the items, or we need a separate user.

I intend to try using a separate user for migrations in the future, but for now, I have a single role that the temporary user will assume which has access to read and write as well as own and maintain the tables, sequences, etc.

Since the temporary user has the ability to create objects, there are some ownership issues that will create problems if those are owned by the temporary user.

I'm using the database option assume_role to assume a permanent postgresql role after connecting to limit ownership confusion. Support for assuming roles for a session was added in 4.2 and is effected by adding ('OPTIONS': { 'assume_role': 'test-owner'} in the database definition in your configuration.

Renewing credentials

To renew the credentials, we're going to need to wrap the database access so that new credentials are retrieved both when required.

For this, I took inspiration from the AWS Samples for secrets manager rotation which used nearly the same mechanism, but with Secrets Manager instead of Vault.

Effectively, I created a new database manager in my-app/db/backends/postgresql using the django.db.backends.postgresql as a base and then in the DATABASES configuration stanza, I referred to my-app.db.backends.postgresql as the database ENGINE.

In the code below, DatabaseCredentials are used to store the database credentials while they are live. The credentials are stored by the DatabaseWrapper in instance storage, retrieving the credentials at init time and passing the settings_dict from the original DATABASES block along so that we can pick up any salient information.

There are a number of vault-related parameters, all prefixed with VAULT_:

Parameter Required Purpose Default
VAULT_ROLE * vault dynamic role name None
VAULT_STATIC_ROLE * vault static role name None
VAULT_MOUNT_POINT database secret store mount point database
VAULT_ADDR URL for the vault None
VAULT_TOKEN Token for accessing vault None

*: At least one of VAULT_ROLE and VAULT_STATIC_ROLE must by included.

If either the VAULT_ADDR and VAULT_TOKEN are empty, the hvac library will provide its defaults, reading first from the environment and then using static defaults.

The DatabaseWrapper provides an override for get_new_connection, adding a set of credentials, renewing them if necessary, and then forwarding along to the underlying wrapper afte rthe credentials are replaced.

import logging
import hvac
from django.core.exceptions import ImproperlyConfigured
from django.db import DEFAULT_DB_ALIAS
from django.db.backends.postgresql import base

try:
    try:
        # noinspection PyPep8Naming
        import psycopg as Database
    except ImportError:
        # noinspection PyPep8Naming
        import psycopg2 as Database
except ImportError:
    raise ImproperlyConfigured("Error loading psycopg2 or psycopg module")

logger = logging.getLogger(__name__)


class DatabaseCredentials:
    def __init__(self, settings_dict: dict):
        self.creds = None
        logger.info("init vault credentials")
        self.credential_name = settings_dict.get("VAULT_ROLE", None)
        self.static_credential_name = settings_dict.get("VAULT_STATIC_ROLE", None)
        self.mount_point = settings_dict.get("VAULT_MOUNT_POINT", "database")
        self.vault_url = settings_dict.get("VAULT_ADDR", None)
        self.vault_token = settings_dict.get("VAULT_TOKEN", None)
        self.client = hvac.Client(url=self.vault_url, token=self.vault_token)
        self.refresh_now()

    def get_conn_params_from_vault(self, conn_params):
        conn_params["user"] = self.creds["username"]
        conn_params["password"] = self.creds["password"]
        logger.info(f"Getting db creds: user={self.creds['username']}")
        return

    def refresh_now(self):
        logger.info(f"refreshing credentials for {self.credential_name}")
        if self.static_credential_name:
            our_creds = self.client.secrets.database.get_static_credentials(
                self.credential_name, mount_point=self.mount_point
            )
        else:
            our_creds = self.client.secrets.database.generate_credentials(
                self.credential_name, mount_point=self.mount_point
            )

        self.creds = our_creds["data"]


class DatabaseWrapper(base.DatabaseWrapper):
    def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS):
        self.database_credentials = DatabaseCredentials(settings_dict)
        super().__init__(settings_dict, alias)

    def get_new_connection(self, conn_params):
        try:
            logger.info("get connection")
            self.database_credentials.get_conn_params_from_vault(conn_params)
            conn = super(DatabaseWrapper, self).get_new_connection(conn_params)
            return conn
        except Database.OperationalError as e:
            # there doesn't appear to be a good way to check for a specific error
            # other than to read the string and look for "authentication failed"
            if "authentication failed" not in str(e):
                raise

            logger.info("Authentication error. Going to refresh secret and try again.")
            self.database_credentials.refresh_now()
            self.database_credentials.get_conn_params_from_vault(conn_params)
            conn = super(DatabaseWrapper, self).get_new_connection(conn_params)
            logger.info(
                "Successfully refreshed secret and established new database connection."
            )
            return conn