Skip to content

Checks

Django Built-in Services

health_check.Cache dataclass

Bases: HealthCheck

Check that the cache backend is able to set and get a value.

Parameters:

Name Type Description Default
alias str

The cache alias to test against.

'default'
Source code in health_check/cache/backends.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@dataclasses.dataclass
class CacheBackend(HealthCheck):
    """
    Check that the cache backend is able to set and get a value.

    Args:
        alias: The cache alias to test against.

    """

    alias: str = dataclasses.field(default="default")
    cache_key: str = dataclasses.field(
        default=getattr(settings, "HEALTHCHECK_CACHE_KEY", "djangohealthcheck_test"), repr=False
    )

    def check_status(self):
        cache = caches[self.alias]
        ts = datetime.datetime.now().timestamp()
        try:
            cache.set(self.cache_key, f"itworks-{ts}")
            if not cache.get(self.cache_key) == f"itworks-{ts}":
                raise ServiceUnavailable(f"Cache key {self.cache_key} does not match")
        except CacheKeyWarning as e:
            self.add_error(ServiceReturnedUnexpectedResult("Cache key warning"), e)
        except ValueError as e:
            self.add_error(ServiceReturnedUnexpectedResult("ValueError"), e)
        except (ConnectionError, RedisError) as e:
            self.add_error(ServiceReturnedUnexpectedResult("Connection Error"), e)

health_check.Database dataclass

Bases: HealthCheck

Check database connectivity by executing a simple SELECT 1 query.

Parameters:

Name Type Description Default
alias str

The alias of the database connection to check.

'default'
Source code in health_check/contrib/db_heartbeat/backends.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@dataclasses.dataclass
class DatabaseHeartBeatCheck(HealthCheck):
    """
    Check database connectivity by executing a simple SELECT 1 query.

    Args:
        alias: The alias of the database connection to check.

    """

    alias: str = dataclasses.field(default="default")

    def check_status(self):
        connection = connections[self.alias]
        try:
            result = None
            compiler = connection.ops.compiler("SQLCompiler")(SelectOne(), connection, None)
            with connection.cursor() as cursor:
                cursor.execute(*compiler.compile(SelectOne()))
                result = cursor.fetchone()

            if result != (1,):
                raise ServiceUnavailable("Health Check query did not return the expected result.")
        except Exception as e:
            raise ServiceUnavailable(f"Database health check failed: {e}")

health_check.Disk dataclass

Bases: HealthCheck

Check system disk usage.

Parameters:

Name Type Description Default
path Path | str

Path to check disk usage for.

getcwd()
max_disk_usage_percent float | None

Maximum disk usage in percent or None to disable the check.

HEALTH_CHECK['DISK_USAGE_MAX']
Source code in health_check/contrib/psutil/backends.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@dataclasses.dataclass()
class DiskUsage(HealthCheck):
    """
    Check system disk usage.

    Args:
        path: Path to check disk usage for.
        max_disk_usage_percent: Maximum disk usage in percent or None to disable the check.

    """

    path: pathlib.Path | str = dataclasses.field(default=os.getcwd())
    max_disk_usage_percent: float | None = dataclasses.field(default=HEALTH_CHECK["DISK_USAGE_MAX"], repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def check_status(self):
        try:
            du = psutil.disk_usage(str(self.path))
            if self.max_disk_usage_percent and du.percent >= self.max_disk_usage_percent:
                raise ServiceWarning(f"{du.percent}\u202f% disk usage")
        except ValueError as e:
            self.add_error(ServiceReturnedUnexpectedResult("ValueError"), e)

health_check.Mail dataclass

Bases: HealthCheck

Check that mail backend is able to open and close connection.

Parameters:

Name Type Description Default
backend str

The email backend to test against.

EMAIL_BACKEND
timeout timedelta

Timeout for connection to mail server.

timedelta(seconds=get('MAIL_TIMEOUT', 15))
Source code in health_check/contrib/mail/backends.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@dataclasses.dataclass
class MailHealthCheck(backends.HealthCheck):
    """
    Check that mail backend is able to open and close connection.

    Args:
        backend: The email backend to test against.
        timeout: Timeout for connection to mail server.

    """

    backend: str = settings.EMAIL_BACKEND
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=conf.HEALTH_CHECK.get("MAIL_TIMEOUT", 15)),
        repr=False,
    )

    def check_status(self) -> None:
        connection: BaseEmailBackend = get_connection(self.backend, fail_silently=False)
        connection.timeout = self.timeout.total_seconds()
        logger.debug("Trying to open connection to mail backend.")
        try:
            connection.open()
        except smtplib.SMTPException as error:
            self.add_error(
                error=exceptions.ServiceUnavailable(
                    "Failed to open connection with SMTP server",
                ),
                cause=error,
            )
        except ConnectionRefusedError as error:
            self.add_error(
                error=exceptions.ServiceUnavailable(
                    "Connection refused error",
                ),
                cause=error,
            )
        except BaseException as error:
            self.add_error(
                error=exceptions.ServiceUnavailable(
                    f"Unknown error {error.__class__}",
                ),
                cause=error,
            )
        finally:
            connection.close()
        logger.debug("Connection established. Mail backend %r is healthy.", self.backend)

health_check.Memory dataclass

Bases: HealthCheck

Check system memory usage.

Parameters:

Name Type Description Default
min_gibibytes_available float | None

Minimum available memory in gibibytes or None to disable the check.

None
max_memory_usage_percent float | None

Maximum memory usage in percent or None to disable the check.

90.0
Source code in health_check/contrib/psutil/backends.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@dataclasses.dataclass()
class MemoryUsage(HealthCheck):
    """
    Check system memory usage.

    Args:
        min_gibibytes_available: Minimum available memory in gibibytes or None to disable the check.
        max_memory_usage_percent: Maximum memory usage in percent or None to disable the check.

    """

    min_gibibytes_available: float | None = dataclasses.field(default=None, repr=False)
    max_memory_usage_percent: float | None = dataclasses.field(default=90.0, repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def check_status(self):
        try:
            memory = psutil.virtual_memory()
            available_gibi = memory.available / (1024**3)
            total_gibi = memory.total / (1024**3)
            msg = f"RAM {available_gibi:.1f}/{total_gibi:.1f}GiB ({memory.percent}\u202f%)"
            if self.min_gibibytes_available and available_gibi < self.min_gibibytes_available:
                raise ServiceWarning(msg)
            if self.max_memory_usage_percent and memory.percent >= self.max_memory_usage_percent:
                raise ServiceWarning(msg)
        except ValueError as e:
            self.add_error(ServiceReturnedUnexpectedResult("ValueError"), e)

health_check.Storage dataclass

Bases: HealthCheck

Check file storage backends by saving, reading, and deleting a test file.

Parameters:

Name Type Description Default
alias str

The alias of the storage backend to check. Defaults to "default".

'default'
Source code in health_check/storage/backends.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@dataclasses.dataclass
class StorageHealthCheck(HealthCheck):
    """
    Check file storage backends by saving, reading, and deleting a test file.

    Args:
        alias (str): The alias of the storage backend to check. Defaults to "default".

    """

    alias: str = "default"

    def get_storage(self):
        return storages[self.storage_alias if hasattr(self, "storage_alias") else self.alias]

    def get_file_name(self):
        return f"health_check_storage_test/test-{uuid.uuid4()}.txt"

    def get_file_content(self):
        return f"# generated by health_check.Storage at {datetime.datetime.now().timestamp()}".encode()

    def check_save(self, file_name, file_content):
        storage = self.get_storage()
        # save the file
        file_name = storage.save(file_name, ContentFile(content=file_content))
        # read the file and compare
        if not storage.exists(file_name):
            raise ServiceUnavailable("File does not exist")
        with storage.open(file_name) as f:
            if not f.read() == file_content:
                raise ServiceUnavailable("File content does not match")
        return file_name

    def check_delete(self, file_name):
        storage = self.get_storage()
        # delete the file and make sure it is gone
        storage.delete(file_name)
        if storage.exists(file_name):
            raise ServiceUnavailable("File was not deleted")

    def check_status(self):
        try:
            # write the file to the storage backend
            file_name = self.get_file_name()
            file_content = self.get_file_content()
            file_name = self.check_save(file_name, file_content)
            self.check_delete(file_name)
            return True
        except ServiceUnavailable as e:
            raise e
        except Exception as e:
            raise ServiceUnavailable("Unknown exception") from e

3rd Party Services

To use the checks, you will need to install and set up their corresponding dependencies.

health_check.contrib.celery.Ping dataclass

Bases: HealthCheck

Check Celery worker availability using the ping control command.

Parameters:

Name Type Description Default
timeout timedelta

Timeout duration for the ping command.

timedelta(seconds=getattr(settings, 'HEALTHCHECK_CELERY_PING_TIMEOUT', 1))
Source code in health_check/contrib/celery_ping/backends.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@dataclasses.dataclass
class CeleryPingHealthCheck(HealthCheck):
    """
    Check Celery worker availability using the ping control command.

    Args:
        timeout: Timeout duration for the ping command.

    """

    CORRECT_PING_RESPONSE = {"ok": "pong"}
    timeout: datetime.timedelta = datetime.timedelta(seconds=getattr(settings, "HEALTHCHECK_CELERY_PING_TIMEOUT", 1))

    def check_status(self):
        try:
            ping_result = app.control.ping(timeout=self.timeout.total_seconds())
        except OSError as e:
            self.add_error(ServiceUnavailable("IOError"), e)
        except NotImplementedError as exc:
            self.add_error(
                ServiceUnavailable("NotImplementedError: Make sure CELERY_RESULT_BACKEND is set"),
                exc,
            )
        except BaseException as exc:
            self.add_error(ServiceUnavailable("Unknown error"), exc)
        else:
            if not ping_result:
                self.add_error(
                    ServiceUnavailable("Celery workers unavailable"),
                )
            else:
                self._check_ping_result(ping_result)

    def _check_ping_result(self, ping_result):
        active_workers = []

        for result in ping_result:
            worker, response = list(result.items())[0]
            if response != self.CORRECT_PING_RESPONSE:
                self.add_error(
                    ServiceUnavailable(f"Celery worker {worker} response was incorrect"),
                )
                continue
            active_workers.append(worker)

        if not self.errors:
            self._check_active_queues(active_workers)

    def _check_active_queues(self, active_workers):
        defined_queues = getattr(app.conf, "task_queues", None) or getattr(app.conf, "CELERY_QUEUES", None)

        if not defined_queues:
            return

        defined_queues = {queue.name for queue in defined_queues}
        active_queues = set()

        for queues in app.control.inspect(active_workers).active_queues().values():
            active_queues.update([queue.get("name") for queue in queues])

        for queue in defined_queues.difference(active_queues):
            self.add_error(
                ServiceUnavailable(f"No worker for Celery task queue {queue}"),
            )

health_check.contrib.rabbitmq.RabbitMQ dataclass

Bases: HealthCheck

Check RabbitMQ service by opening and closing a broker channel.

Parameters:

Name Type Description Default
namespace str

Optional namespace for the broker URL setting.

None
Source code in health_check/contrib/rabbitmq/backends.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@dataclasses.dataclass
class RabbitMQHealthCheck(HealthCheck):
    """
    Check RabbitMQ service by opening and closing a broker channel.

    Args:
        namespace: Optional namespace for the broker URL setting.

    """

    namespace: str = None

    def check_status(self):
        logger.debug("Checking for a broker_url on django settings...")

        broker_url_setting_key = f"{self.namespace}_BROKER_URL" if self.namespace else "BROKER_URL"
        broker_url = getattr(settings, broker_url_setting_key, None)

        logger.debug("Got %s as the broker_url. Connecting to rabbit...", broker_url)

        logger.debug("Attempting to connect to rabbit...")
        try:
            # conn is used as a context to release opened resources later
            with Connection(broker_url) as conn:
                conn.connect()  # exceptions may be raised upon calling connect
        except ConnectionRefusedError as e:
            self.add_error(
                ServiceUnavailable("Unable to connect to RabbitMQ: Connection was refused."),
                e,
            )

        except AccessRefused as e:
            self.add_error(
                ServiceUnavailable("Unable to connect to RabbitMQ: Authentication error."),
                e,
            )

        except OSError as e:
            self.add_error(ServiceUnavailable("IOError"), e)

        except BaseException as e:
            self.add_error(ServiceUnavailable("Unknown error"), e)
        else:
            logger.debug("Connection established. RabbitMQ is healthy.")

health_check.contrib.redis.Redis dataclass

Bases: HealthCheck

Check Redis service by pinging the redis instance with a redis connection.

Parameters:

Name Type Description Default
redis_url str

The Redis connection URL.

getattr(settings, 'REDIS_URL', 'redis://localhost/1')
redis_url_options dict[str, Any]

Additional options for the Redis connection.

getattr(settings, 'HEALTHCHECK_REDIS_URL_OPTIONS', None)
Source code in health_check/contrib/redis/backends.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@dataclasses.dataclass
class RedisHealthCheck(HealthCheck):
    """
    Check Redis service by pinging the redis instance with a redis connection.

    Args:
        redis_url: The Redis connection URL.
        redis_url_options: Additional options for the Redis connection.

    """

    redis_url: str = dataclasses.field(default=getattr(settings, "REDIS_URL", "redis://localhost/1"), repr=False)
    redis_url_options: dict[str, typing.Any] = dataclasses.field(
        default=getattr(settings, "HEALTHCHECK_REDIS_URL_OPTIONS", None), repr=False
    )

    def check_status(self):
        logger.debug("Got %s as the redis_url. Connecting to redis...", self.redis_url)

        logger.debug("Attempting to connect to redis...")
        try:
            # conn is used as a context to release opened resources later
            with from_url(self.redis_url, **(self.redis_url_options or {})) as conn:
                conn.ping()  # exceptions may be raised upon ping
        except ConnectionRefusedError as e:
            self.add_error(
                ServiceUnavailable("Unable to connect to Redis: Connection was refused."),
                e,
            )
        except exceptions.TimeoutError as e:
            self.add_error(ServiceUnavailable("Unable to connect to Redis: Timeout."), e)
        except exceptions.ConnectionError as e:
            self.add_error(ServiceUnavailable("Unable to connect to Redis: Connection Error"), e)
        except BaseException as e:
            self.add_error(ServiceUnavailable("Unknown error"), e)
        else:
            logger.debug("Connection established. Redis is healthy.")