Skip to content

Checks

Django Built-in Services

health_check.Cache dataclass

Bases: HealthCheck

Check that the cache backend is able to set and get a value.

It can be setup multiple times for different cache aliases if needed.

Parameters:

Name Type Description Default
alias str

The cache alias to test against.

'default'
key_prefix str

Prefix for the node specific cache key.

'djangohealthcheck_test'
timeout timedelta

Time until probe keys expire in the cache backend.

timedelta(seconds=5)
Source code in health_check/checks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclasses.dataclass
class Cache(HealthCheck):
    """
    Check that the cache backend is able to set and get a value.

    It can be setup multiple times for different cache aliases if needed.

    Args:
        alias: The cache alias to test against.
        key_prefix: Prefix for the node specific cache key.
        timeout: Time until probe keys expire in the cache backend.

    """

    alias: str = "default"
    key_prefix: str = dataclasses.field(default="djangohealthcheck_test", repr=False)
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=5), repr=False
    )

    async def run(self):
        try:
            cache = caches[self.alias]
        except InvalidCacheBackendError as e:
            raise ServiceUnavailable("Cache alias does not exist") from e
        # Use an isolated key per probe run to avoid cross-process write races.
        cache_key = f"{self.key_prefix}:{uuid.uuid4().hex}"
        cache_value = f"itworks-{datetime.datetime.now().timestamp()}"
        try:
            await cache.aset(
                cache_key,
                cache_value,
                timeout=self.timeout.total_seconds(),
            )
            if not await cache.aget(cache_key) == cache_value:
                raise ServiceUnavailable(f"Cache key {cache_key} does not match")
        except CacheKeyWarning as e:
            raise ServiceReturnedUnexpectedResult("Cache key warning") from e
        except ValueError as e:
            raise ServiceReturnedUnexpectedResult("ValueError") from e
        except (ConnectionError, RedisError) as e:
            raise ServiceReturnedUnexpectedResult("Connection Error") from e

health_check.DNS dataclass

Bases: HealthCheck

Check DNS resolution by resolving the server's hostname.

Verifies that DNS resolution is working using the system's configured DNS servers, as well as nameserver resolution for the provided hostname.

Parameters:

Name Type Description Default
hostname str

The hostname to resolve.

gethostname()
timeout timedelta

DNS query timeout.

timedelta(seconds=5)
Source code in health_check/checks.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@dataclasses.dataclass
class DNS(HealthCheck):
    """
    Check DNS resolution by resolving the server's hostname.

    Verifies that DNS resolution is working using the system's configured
    DNS servers, as well as nameserver resolution for the provided hostname.

    Args:
        hostname: The hostname to resolve.
        timeout: DNS query timeout.

    """

    hostname: str = dataclasses.field(default_factory=socket.gethostname)
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=5), repr=False
    )
    nameservers: list[str] | None = dataclasses.field(default=None, repr=False)

    async def run(self):
        logger.debug("Attempting to resolve hostname: %s", self.hostname)

        resolver = dns.asyncresolver.Resolver()
        resolver.lifetime = self.timeout.total_seconds()
        if self.nameservers is not None:
            resolver.nameservers = self.nameservers

        try:
            # Perform DNS resolution (A record by default)
            answers = await resolver.resolve(self.hostname, "A")
        except dns.resolver.NXDOMAIN as e:
            raise ServiceUnavailable(
                f"DNS resolution failed: hostname {self.hostname} does not exist"
            ) from e
        except dns.resolver.NoAnswer as e:
            raise ServiceUnavailable(
                f"DNS resolution failed: no answer for {self.hostname}"
            ) from e
        except dns.resolver.Timeout as e:
            raise ServiceUnavailable(
                f"DNS resolution failed: timeout resolving {self.hostname}"
            ) from e
        except dns.resolver.NoNameservers as e:
            raise ServiceUnavailable(
                "DNS resolution failed: no nameservers available"
            ) from e
        except dns.exception.DNSException as e:
            raise ServiceUnavailable(f"DNS resolution failed: {e}") from e
        else:
            logger.debug(
                "Successfully resolved %s to %s",
                self.hostname,
                [str(rdata) for rdata in answers],
            )

health_check.Database dataclass

Bases: HealthCheck

Check database operation by executing a simple SELECT 1 query.

It can be setup multiple times for different database connections if needed. No actual data is read from or written to the database to minimize the performance impact and work with conservative database user permissions.

Parameters:

Name Type Description Default
alias str

The alias of the database connection to check.

'default'
Source code in health_check/checks.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@dataclasses.dataclass
class Database(HealthCheck):
    """
    Check database operation by executing a simple SELECT 1 query.

    It can be setup multiple times for different database connections if needed.
    No actual data is read from or written to the database to minimize the performance impact
    and work with conservative database user permissions.

    Args:
        alias: The alias of the database connection to check.

    """

    alias: str = "default"

    def run(self):
        try:
            connection = connections[self.alias]
        except ConnectionDoesNotExist as e:
            raise ServiceUnavailable("Database alias does not exist") from e
        result = None
        try:
            compiler = connection.ops.compiler("SQLCompiler")(
                _SelectOne(), connection, None
            )
            with connection.cursor() as cursor:
                cursor.execute(*compiler.compile(_SelectOne()))
                result = cursor.fetchone()
        except db.Error as e:
            raise ServiceUnavailable(str(e).rsplit(":")[0]) from e
        else:
            if result != (1,):
                raise ServiceUnavailable(
                    "Health Check query did not return the expected result."
                )
        finally:
            connection.close_if_unusable_or_obsolete()

health_check.Mail dataclass

Bases: HealthCheck

Check that mail backend is able to open and close connection.

Parameters:

Name Type Description Default
backend str

The email backend to test against.

EMAIL_BACKEND
timeout timedelta

Timeout for connection to mail server in seconds.

timedelta(seconds=15)
Source code in health_check/checks.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
@dataclasses.dataclass
class Mail(HealthCheck):
    """
    Check that mail backend is able to open and close connection.

    Args:
        backend: The email backend to test against.
        timeout: Timeout for connection to mail server in seconds.

    """

    backend: str = settings.EMAIL_BACKEND
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=15), repr=False
    )

    def run(self) -> None:
        connection: BaseEmailBackend = get_connection(self.backend, fail_silently=False)
        connection.timeout = self.timeout.total_seconds()
        logger.debug("Trying to open connection to mail backend.")
        try:
            connection.open()
        except smtplib.SMTPException as e:
            raise ServiceUnavailable(
                "Failed to open connection with SMTP server"
            ) from e
        except ConnectionRefusedError as e:
            raise ServiceUnavailable("Connection refused error") from e
        finally:
            connection.close()
        logger.debug(
            "Connection established. Mail backend %r is healthy.", self.backend
        )

health_check.Storage dataclass

Bases: HealthCheck

Check file storage backends by saving, reading, and deleting a test file.

It can be setup multiple times for different storage backends if needed.

Parameters:

Name Type Description Default
alias str

The alias of the storage backend to check.

'default'
Source code in health_check/checks.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
@dataclasses.dataclass
class Storage(HealthCheck):
    """
    Check file storage backends by saving, reading, and deleting a test file.

    It can be setup multiple times for different storage backends if needed.

    Args:
        alias: The alias of the storage backend to check.

    """

    alias: str = "default"

    @property
    def storage(self) -> DjangoStorage:
        try:
            return storages[self.alias]
        except InvalidStorageError as e:
            raise ServiceUnavailable("Storage alias does not exist") from e

    def get_file_name(self):
        return f"health_check_storage_test/test-{uuid.uuid4()}.txt"

    def get_file_content(self):
        return f"# generated by health_check.Storage at {datetime.datetime.now().timestamp()}".encode()

    def check_save(self, file_name, file_content):
        # save the file
        file_name = self.storage.save(file_name, ContentFile(content=file_content))
        # read the file and compare
        if not self.storage.exists(file_name):
            raise ServiceUnavailable("File does not exist")
        with self.storage.open(file_name) as f:
            if not f.read() == file_content:
                raise ServiceUnavailable("File content does not match")
        return file_name

    def check_delete(self, file_name):
        # delete the file and make sure it is gone
        self.storage.delete(file_name)
        if self.storage.exists(file_name):
            raise ServiceUnavailable("File was not deleted")

    def run(self):
        # write the file to the storage backend
        file_name = self.get_file_name()
        file_content = self.get_file_content()
        file_name = self.check_save(file_name, file_content)
        self.check_delete(file_name)

System Services

To use the psutil-based checks, you will need to install psutil extra:

pip install django-health-check[psutil]

health_check.contrib.psutil.Battery dataclass

Bases: HealthCheck

Warn about system battery status and power connection.

Parameters:

Name Type Description Default
min_percent_available float | None

Minimum battery percentage available or None to disable the check.

20.0
power_plugged bool

Whether to warn if the power is unplugged.

False
Source code in health_check/contrib/psutil.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclasses.dataclass
class Battery(HealthCheck):
    """
    Warn about system battery status and power connection.

    Args:
        min_percent_available: Minimum battery percentage available or None to disable the check.
        power_plugged: Whether to warn if the power is unplugged.

    """

    min_percent_available: float | None = dataclasses.field(default=20.0, repr=False)
    power_plugged: bool = dataclasses.field(default=False, repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def run(self):
        try:
            battery = psutil.sensors_battery()
        except AttributeError as e:
            raise ServiceUnavailable("Battery information not available") from e
        except ValueError as e:
            raise ServiceReturnedUnexpectedResult("ValueError") from e
        else:
            if (
                self.min_percent_available
                and battery.percent <= self.min_percent_available
            ):
                raise ServiceWarning(f"Battery {battery.percent:.1f}\u202f%")
            if self.power_plugged and not battery.power_plugged:
                raise ServiceWarning(
                    f"Power unplugged with battery at {battery.percent:.1f}\u202f%"
                )

health_check.contrib.psutil.CPU dataclass

Bases: HealthCheck

Warn about system CPU utilization.

The utilization represents an interval rather than a point in time measurement. The interval starts with the previous execution of this check and with the current unless an explicit interval is provided. An explicit interval will cause a blocking measurement and increases the execution time of the check by the length of the interval.

Parameters:

Name Type Description Default
max_usage_percent float | None

Maximum CPU usage in percent or None to disable the check.

80.0
interval timedelta | None

The interval to measure CPU usage over or None to use the interval since the last check execution.

None
Source code in health_check/contrib/psutil.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclasses.dataclass
class CPU(HealthCheck):
    """
    Warn about system CPU utilization.

    The utilization represents an interval rather than a point in time measurement.
    The interval starts with the previous execution of this check and with the current
    unless an explicit interval is provided. An explicit interval will cause a blocking
    measurement and increases the execution time of the check by the length of the interval.

    Args:
        max_usage_percent: Maximum CPU usage in percent or None to disable the check.
        interval: The interval to measure CPU usage over or None to use the interval since the last check execution.

    """

    max_usage_percent: float | None = dataclasses.field(default=80.0, repr=False)
    interval: datetime.timedelta | None = dataclasses.field(default=None, repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def run(self):
        try:
            usage_percent = psutil.cpu_percent(
                interval=self.interval.total_seconds() if self.interval else None
            )
            msg = f"CPU {usage_percent:.1f}\u202f%"
            if self.max_usage_percent and usage_percent >= self.max_usage_percent:
                raise ServiceWarning(msg)
        except ValueError as e:
            raise ServiceReturnedUnexpectedResult("ValueError") from e

health_check.contrib.psutil.Memory dataclass

Bases: HealthCheck

Warn about system memory utilization.

Parameters:

Name Type Description Default
min_gibibytes_available float | None

Minimum available memory in gibibytes or None to disable the check.

None
max_memory_usage_percent float | None

Maximum memory usage in percent or None to disable the check.

90.0
Source code in health_check/contrib/psutil.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@dataclasses.dataclass()
class Memory(HealthCheck):
    """
    Warn about system memory utilization.

    Args:
        min_gibibytes_available: Minimum available memory in gibibytes or None to disable the check.
        max_memory_usage_percent: Maximum memory usage in percent or None to disable the check.

    """

    min_gibibytes_available: float | None = dataclasses.field(default=None, repr=False)
    max_memory_usage_percent: float | None = dataclasses.field(default=90.0, repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def run(self):
        try:
            memory = psutil.virtual_memory()
            available_gibi = memory.available / (1024**3)
            total_gibi = memory.total / (1024**3)
            msg = f"RAM {available_gibi:.1f}/{total_gibi:.1f}GiB ({memory.percent}\u202f%)"
            if (
                self.min_gibibytes_available
                and available_gibi < self.min_gibibytes_available
            ):
                raise ServiceWarning(msg)
            if (
                self.max_memory_usage_percent
                and memory.percent >= self.max_memory_usage_percent
            ):
                raise ServiceWarning(msg)
        except ValueError as e:
            raise ServiceReturnedUnexpectedResult("ValueError") from e

health_check.contrib.psutil.Disk dataclass

Bases: HealthCheck

Warn about disk usage for a given system path.

It can be setup multiple times at different system paths, e.g. one at your application root and one at your media storage root.

Parameters:

Name Type Description Default
path Path | str

Path to check disk usage for.

getcwd()
max_disk_usage_percent float | None

Maximum disk usage in percent or None to disable the check.

90.0
Source code in health_check/contrib/psutil.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@dataclasses.dataclass()
class Disk(HealthCheck):
    """
    Warn about disk usage for a given system path.

    It can be setup multiple times at different system paths,
    e.g. one at your application root and one at your media storage root.

    Args:
        path: Path to check disk usage for.
        max_disk_usage_percent: Maximum disk usage in percent or None to disable the check.

    """

    path: pathlib.Path | str = dataclasses.field(default_factory=os.getcwd)
    max_disk_usage_percent: float | None = dataclasses.field(default=90.0, repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def run(self):
        try:
            du = psutil.disk_usage(str(self.path))
            if (
                self.max_disk_usage_percent
                and du.percent >= self.max_disk_usage_percent
            ):
                raise ServiceWarning(f"{du.percent}\u202f% disk usage")
        except ValueError as e:
            raise ServiceReturnedUnexpectedResult("ValueError") from e

health_check.contrib.psutil.Temperature dataclass

Bases: HealthCheck

Warn about system temperature.

If no maximum temperature is specified, the sensor's high threshold will be used.

Parameters:

Name Type Description Default
device str | None

The device to check temperature for, e.g. 'coretemp' for CPU temperature on many systems. If None, check all available sensors.

'coretemp'
max_temperature_celsius float | None

Maximum temperature in degree Celsius or None to disable the check.

None
Source code in health_check/contrib/psutil.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@dataclasses.dataclass
class Temperature(HealthCheck):
    """
    Warn about system temperature.

    If no maximum temperature is specified, the sensor's high threshold will be used.

    Args:
        device: The device to check temperature for, e.g. 'coretemp' for CPU temperature on many systems. If None, check all available sensors.
        max_temperature_celsius: Maximum temperature in degree Celsius or None to disable the check.

    """

    device: str | None = dataclasses.field(default="coretemp")
    max_temperature_celsius: float | None = dataclasses.field(default=None, repr=False)
    hostname: str = dataclasses.field(default_factory=socket.gethostname, init=False)

    def run(self):
        try:
            temperatures = psutil.sensors_temperatures()
        except AttributeError as e:
            raise ServiceUnavailable("Temperature information not available") from e
        else:
            if self.device:
                try:
                    sensors = temperatures[self.device]
                except KeyError:
                    raise ServiceUnavailable(
                        f"Sensor {self.device!r} not found"
                    ) from None
                else:
                    for sensor in sensors:
                        if sensor.current >= (
                            self.max_temperature_celsius or sensor.high
                        ):
                            raise ServiceWarning(
                                f"{sensor.label} {sensor.current:.1f}\u202f°C"
                            )
            else:
                for device, sensors in temperatures.items():
                    for sensor in sensors:
                        if sensor.current >= (
                            self.max_temperature_celsius or sensor.high
                        ):
                            raise ServiceWarning(
                                f"{device} {sensor.label} {sensor.current:.1f}\u202f°C"
                            )

3rd Party Services

To use the checks, you will need to install and set up their corresponding dependencies.

To enable AWS health checks, install the extra for the contrib checks:

pip install django-health-check[redis,rabbitmq,celery,kafka]

health_check.contrib.celery.Ping dataclass

Bases: HealthCheck

Check Celery worker availability using the ping control command.

Parameters:

Name Type Description Default
app Celery

Celery application instance to use for the health check, defaults to the default Celery app.

app_or_default()
timeout timedelta

Timeout duration for the ping command.

timedelta(seconds=1)
limit int | None

Maximum number of workers to wait for before returning. If not specified, waits for the full timeout duration. When set, returns immediately after receiving responses from this many workers.

None
Source code in health_check/contrib/celery.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@dataclasses.dataclass
class Ping(HealthCheck):
    """
    Check Celery worker availability using the ping control command.

    Args:
        app: Celery application instance to use for the health check, defaults to the [default Celery app][celery.app.default_app].
        timeout: Timeout duration for the ping command.
        limit: Maximum number of workers to wait for before returning. If not
            specified, waits for the full timeout duration. When set, returns
            immediately after receiving responses from this many workers.

    """

    CORRECT_PING_RESPONSE: typing.ClassVar[dict[str, str]] = {"ok": "pong"}
    app: celery.Celery = dataclasses.field(default_factory=app_or_default)
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=1), repr=False
    )
    limit: int | None = dataclasses.field(default=None, repr=False)

    def run(self):
        try:
            ping_result = self.app.control.ping(
                timeout=self.timeout.total_seconds(), limit=self.limit
            )
        except OSError as e:
            raise ServiceUnavailable("IOError") from e
        except NotImplementedError as e:
            raise ServiceUnavailable(
                "NotImplementedError: Make sure CELERY_RESULT_BACKEND is set"
            ) from e
        else:
            if not ping_result:
                raise ServiceUnavailable("Celery workers unavailable")
            else:
                self.check_active_queues(*self.active_workers(ping_result))

    def active_workers(self, ping_result):
        for result in ping_result:
            worker, response = list(result.items())[0]
            if response != self.CORRECT_PING_RESPONSE:
                raise ServiceUnavailable(
                    f"Celery worker {worker} response was incorrect"
                )
            yield worker

    def check_active_queues(self, *active_workers):
        try:
            defined_queues = {queue.name for queue in self.app.conf.task_queues}
        except TypeError:
            # conf.task_queues may be None
            defined_queues = {self.app.conf.task_default_queue}
        active_queues = {
            queue.get("name")
            for queues in self.app.control.inspect(active_workers)
            .active_queues()
            .values()
            for queue in queues
        }

        for queue in defined_queues - active_queues:
            raise ServiceUnavailable(f"No worker for Celery task queue {queue}")

health_check.contrib.kafka.Kafka dataclass

Bases: HealthCheck

Check Kafka service by connecting to a Kafka broker and listing topics.

Parameters:

Name Type Description Default
bootstrap_servers list[str]

List of Kafka bootstrap servers, e.g., ['localhost:9092'].

required
timeout timedelta

Timeout duration for the connection check as a datetime.timedelta.

timedelta(seconds=10)
Source code in health_check/contrib/kafka.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclasses.dataclass
class Kafka(HealthCheck):
    """
    Check Kafka service by connecting to a Kafka broker and listing topics.

    Args:
        bootstrap_servers: List of Kafka bootstrap servers, e.g., ['localhost:9092'].
        timeout: Timeout duration for the connection check as a datetime.timedelta.

    """

    bootstrap_servers: list[str]
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )

    async def run(self):
        logger.debug(
            "Connecting to Kafka bootstrap servers %r ...",
            self.bootstrap_servers,
        )

        # Create a consumer with minimal configuration for health check
        timeout_ms = int(self.timeout.total_seconds() * 1000)
        consumer = AIOConsumer(
            {
                "bootstrap.servers": ",".join(self.bootstrap_servers),
                "client.id": "health-check",
                "group.id": "health-check",
                "session.timeout.ms": timeout_ms,
                "socket.timeout.ms": timeout_ms,
            }
        )

        try:
            if not (
                (
                    cluster_metadata := await consumer.list_topics(
                        timeout=self.timeout.total_seconds()
                    )
                )
                and cluster_metadata.topics
            ):
                raise ServiceUnavailable("Failed to retrieve Kafka topics.")

        except KafkaException as e:
            raise ServiceUnavailable("Unable to connect") from e
        else:
            logger.debug(
                "Connection established. Kafka is healthy. Found %d topics.",
                len(cluster_metadata.topics),
            )
        finally:
            await consumer.close()

health_check.contrib.rabbitmq.RabbitMQ dataclass

Bases: HealthCheck

Check RabbitMQ service by opening and closing a broker channel.

Parameters:

Name Type Description Default
amqp_url str

The URL of the RabbitMQ broker to connect to, e.g., 'amqp://guest:guest@localhost:5672//'.

required
Source code in health_check/contrib/rabbitmq.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclasses.dataclass
class RabbitMQ(HealthCheck):
    """
    Check RabbitMQ service by opening and closing a broker channel.

    Args:
        amqp_url (str): The URL of the RabbitMQ broker to connect to, e.g., 'amqp://guest:guest@localhost:5672//'.

    """

    amqp_url: str

    async def run(self):
        logger.debug("Attempting to connect to %r...", self.amqp_url)
        try:
            # conn is used as a context to release opened resources later
            connection = await aio_pika.connect_robust(self.amqp_url)
            await connection.close()
        except ConnectionRefusedError as e:
            raise ServiceUnavailable(
                "Unable to connect to RabbitMQ: Connection was refused."
            ) from e
        except aio_pika.exceptions.ProbableAuthenticationError as e:
            raise ServiceUnavailable(
                "Unable to connect to RabbitMQ: Authentication error."
            ) from e
        except OSError as e:
            raise ServiceUnavailable("IOError") from e
        else:
            logger.debug("Connection established. RabbitMQ is healthy.")

health_check.contrib.redis.Redis dataclass

Bases: HealthCheck

Check Redis service by pinging a Redis client.

This check works with any Redis client that implements the ping() method, including standard Redis, Sentinel, and Cluster clients.

Parameters:

Name Type Description Default
client_factory Callable[[], Redis | RedisCluster] | None

A callable that returns an instance of a Redis client.

None
client Redis | RedisCluster | None

Deprecated, use client_factory instead.

None

Examples:

Using a standard Redis client:

>>> from redis.asyncio import Redis as RedisClient
>>> Redis(client_factory=lambda: RedisClient(host='localhost', port=6379))

Using from_url to create a client:

>>> from redis.asyncio import Redis as RedisClient
>>> Redis(client_factory=lambda: RedisClient.from_url('redis://localhost:6379'))

Using a Cluster client:

>>> from redis.asyncio import RedisCluster
>>> Redis(client_factory=lambda: RedisCluster(host='localhost', port=7000))

Using a Sentinel client:

>>> from redis.asyncio import Sentinel
>>> Redis(client_factory=lambda: Sentinel([('localhost', 26379)]).master_for('mymaster'))
Source code in health_check/contrib/redis.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@dataclasses.dataclass
class Redis(HealthCheck):
    """
    Check Redis service by pinging a Redis client.

    This check works with any Redis client that implements the ping() method,
    including standard Redis, Sentinel, and Cluster clients.

    Args:
        client_factory: A callable that returns an instance of a Redis client.
        client: Deprecated, use `client_factory` instead.

    Examples:
        Using a standard Redis client:
        >>> from redis.asyncio import Redis as RedisClient
        >>> Redis(client_factory=lambda: RedisClient(host='localhost', port=6379))

        Using from_url to create a client:
        >>> from redis.asyncio import Redis as RedisClient
        >>> Redis(client_factory=lambda: RedisClient.from_url('redis://localhost:6379'))

        Using a Cluster client:
        >>> from redis.asyncio import RedisCluster
        >>> Redis(client_factory=lambda: RedisCluster(host='localhost', port=7000))

        Using a Sentinel client:
        >>> from redis.asyncio import Sentinel
        >>> Redis(client_factory=lambda: Sentinel([('localhost', 26379)]).master_for('mymaster'))

    """

    client: RedisClient | RedisCluster | None = dataclasses.field(
        repr=False, default=None
    )
    client_factory: typing.Callable[[], RedisClient | RedisCluster] | None = (
        dataclasses.field(repr=False, default=None)
    )

    def __repr__(self):
        # include client host name and logical database number to identify them
        if self.client_factory is not None:
            client = self.client_factory()
        else:
            # Use the deprecated client parameter (user manages lifecycle)
            client = self.client

        try:
            safe_connection_str = ", ".join(
                f"{key}={value!r}"
                for key, value in sorted(
                    client.connection_pool.connection_kwargs.items()
                )
                if key in {"host", "port", "db"}
            )
            return f"Redis({safe_connection_str})"
        except AttributeError:
            pass

        try:
            hosts = [node.name for node in client.startup_nodes]
            return f"Redis(client=RedisCluster(hosts={hosts!r}))"
        except AttributeError:
            return super().__repr__()

    def __post_init__(self):
        # Validate that exactly one of client or client_factory is provided
        if self.client is not None and self.client_factory is not None:
            raise ValueError(
                "Provide exactly one of `client` or `client_factory`, not both."
            )
        if self.client is None and self.client_factory is None:
            raise ValueError(
                "You must provide either `client` (deprecated) or `client_factory` "
                "when instantiating `Redis`."
            )

        # Emit deprecation warning if using the old client parameter
        if self.client is not None:
            warnings.warn(
                "The `client` argument is deprecated and will be removed in a future version. "
                "Please use `client_factory` instead.",
                DeprecationWarning,
                stacklevel=2,
            )

    async def run(self):
        # Create a new client for this health check request
        if self.client_factory is not None:
            client = self.client_factory()
            should_close = True
        else:
            # Use the deprecated client parameter (user manages lifecycle)
            client = self.client
            should_close = False

        logger.debug("Pinging Redis client...")
        try:
            await client.ping()
        except ConnectionRefusedError as e:
            raise ServiceUnavailable(
                "Unable to connect to Redis: Connection was refused."
            ) from e
        except exceptions.TimeoutError as e:
            raise ServiceUnavailable("Unable to connect to Redis: Timeout.") from e
        except exceptions.ConnectionError as e:
            raise ServiceUnavailable(
                "Unable to connect to Redis: Connection Error"
            ) from e
        else:
            logger.debug("Connection established. Redis is healthy.")
        finally:
            # Only close clients created by client_factory
            if should_close:
                await client.aclose()

Cloud Provider Status

Monitor cloud provider service health using their public RSS/Atom status feeds or APIs.

Cloud provider health checks require different extras depending on the provider:

pip install django-health-check[rss,atlassian]

health_check.contrib.rss.AWS dataclass

Bases: Feed

Check AWS service status via their public RSS status feeds.

Parameters:

Name Type Description Default
region str

AWS region code (e.g., 'us-east-1', 'eu-west-1').

required
service str

AWS service name (e.g., 'ec2', 's3', 'rds').

required
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
max_age timedelta

Maximum age for an incident to be considered active.

timedelta(hours=8)
Source code in health_check/contrib/rss.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@dataclasses.dataclass
class AWS(Feed):
    """
    Check AWS service status via their public RSS status feeds.

    Args:
        region: AWS region code (e.g., 'us-east-1', 'eu-west-1').
        service: AWS service name (e.g., 'ec2', 's3', 'rds').
        timeout: Request timeout duration.
        max_age: Maximum age for an incident to be considered active.

    """

    region: str
    service: str
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    max_age: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(hours=8), repr=False
    )

    def __post_init__(self):
        self.feed_url: str = (
            f"https://status.aws.amazon.com/rss/{self.service}-{self.region}.rss"
        )

health_check.contrib.rss.Azure dataclass

Bases: Feed

Check Azure platform status via their public RSS status feed.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
max_age timedelta

Maximum age for an incident to be considered active.

timedelta(hours=8)
Source code in health_check/contrib/rss.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclasses.dataclass
class Azure(Feed):
    """
    Check Azure platform status via their public RSS status feed.

    Args:
        timeout: Request timeout duration.
        max_age: Maximum age for an incident to be considered active.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    max_age: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(hours=8), repr=False
    )
    feed_url: str = dataclasses.field(
        default="https://rssfeed.azure.status.microsoft/en-us/status/feed/",
        init=False,
        repr=False,
    )

health_check.contrib.atlassian.Cloudflare dataclass

Bases: AtlassianStatusPage

Check Cloudflare platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@dataclasses.dataclass
class Cloudflare(AtlassianStatusPage):
    """
    Check Cloudflare platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://www.cloudflarestatus.com", init=False, repr=False
    )
    component: str = ""

health_check.contrib.atlassian.DigitalOcean dataclass

Bases: AtlassianStatusPage

Check DigitalOcean platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@dataclasses.dataclass
class DigitalOcean(AtlassianStatusPage):
    """
    Check DigitalOcean platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://status.digitalocean.com", init=False, repr=False
    )
    component: str = ""

health_check.contrib.atlassian.FlyIo dataclass

Bases: AtlassianStatusPage

Check Fly.io platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@dataclasses.dataclass
class FlyIo(AtlassianStatusPage):
    """
    Check Fly.io platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://status.flyio.net", init=False, repr=False
    )
    component: str = ""

health_check.contrib.atlassian.GitHub dataclass

Bases: AtlassianStatusPage

Check GitHub platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
enterprise_region EnterpriseRegion | None

GitHub Enterprise status page region (if applicable).

None
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@dataclasses.dataclass
class GitHub(AtlassianStatusPage):
    """
    Check GitHub platform status via Atlassian Status Page API v2.

    Args:
        enterprise_region: GitHub Enterprise status page region (if applicable).
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    try:

        class EnterpriseRegion(enum.StrEnum):
            """GitHub Enterprise status page regions."""

            australia = "au"
            """Australia."""
            eu = "eu"
            """Europe."""
            japan = "jp"
            """Japan."""
            us = "us"
            """United States."""

        enterprise_region: EnterpriseRegion | None = None
    except AttributeError:
        # Python <3.11 doesn't have StrEnum, so fall back to a simple string field with validation
        enterprise_region: str | None = None
    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    component: str = ""

    def __post_init__(self):
        self.base_url = f"https://{self.enterprise_region if self.enterprise_region else 'www'}.githubstatus.com"

EnterpriseRegion

Bases: StrEnum

GitHub Enterprise status page regions.

Source code in health_check/contrib/atlassian.py
161
162
163
164
165
166
167
168
169
170
171
class EnterpriseRegion(enum.StrEnum):
    """GitHub Enterprise status page regions."""

    australia = "au"
    """Australia."""
    eu = "eu"
    """Europe."""
    japan = "jp"
    """Japan."""
    us = "us"
    """United States."""
australia = 'au' class-attribute instance-attribute

Australia.

eu = 'eu' class-attribute instance-attribute

Europe.

japan = 'jp' class-attribute instance-attribute

Japan.

us = 'us' class-attribute instance-attribute

United States.

health_check.contrib.rss.GoogleCloud dataclass

Bases: Feed

Check Google Cloud platform status via their public Atom status feed.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
max_age timedelta

Maximum age for an incident to be considered active.

timedelta(hours=8)
Source code in health_check/contrib/rss.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
@dataclasses.dataclass
class GoogleCloud(Feed):
    """
    Check Google Cloud platform status via their public Atom status feed.

    Args:
        timeout: Request timeout duration.
        max_age: Maximum age for an incident to be considered active.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    max_age: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(hours=8), repr=False
    )

    feed_url: str = dataclasses.field(
        default="https://status.cloud.google.com/en/feed.atom", init=False, repr=False
    )

health_check.contrib.rss.Heroku dataclass

Bases: Feed

Check Heroku platform status via their public RSS status feed.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
max_age timedelta

Maximum age for an incident to be considered active.

timedelta(hours=8)
Source code in health_check/contrib/rss.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@dataclasses.dataclass
class Heroku(Feed):
    """
    Check Heroku platform status via their public RSS status feed.

    Args:
        timeout: Request timeout duration.
        max_age: Maximum age for an incident to be considered active.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    max_age: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(hours=8), repr=False
    )
    feed_url: str = dataclasses.field(
        default="https://status.heroku.com/feed", init=False, repr=False
    )

health_check.contrib.rss.Hetzner dataclass

Bases: Feed

Check Hetzner platform status via their public ATOM status feed.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
max_age timedelta

Maximum age for an incident to be considered active.

timedelta(hours=8)
Source code in health_check/contrib/rss.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@dataclasses.dataclass
class Hetzner(Feed):
    """
    Check Hetzner platform status via their public ATOM status feed.

    Args:
        timeout: Request timeout duration.
        max_age: Maximum age for an incident to be considered active.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    max_age: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(hours=8), repr=False
    )
    feed_url: str = dataclasses.field(
        default="https://status.hetzner.com/en.atom", init=False, repr=False
    )

health_check.contrib.atlassian.PlatformSh dataclass

Bases: AtlassianStatusPage

Check Platform.sh platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
@dataclasses.dataclass
class PlatformSh(AtlassianStatusPage):
    """
    Check Platform.sh platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://status.platform.sh", init=False, repr=False
    )
    component: str = ""

health_check.contrib.atlassian.Render dataclass

Bases: AtlassianStatusPage

Check Render platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@dataclasses.dataclass
class Render(AtlassianStatusPage):
    """
    Check Render platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://status.render.com", init=False, repr=False
    )
    component: str = ""

health_check.contrib.atlassian.Sentry dataclass

Bases: AtlassianStatusPage

Check Sentry platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@dataclasses.dataclass
class Sentry(AtlassianStatusPage):
    """
    Check Sentry platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://status.sentry.io", init=False, repr=False
    )
    component: str = ""

health_check.contrib.atlassian.Vercel dataclass

Bases: AtlassianStatusPage

Check Vercel platform status via Atlassian Status Page API v2.

Parameters:

Name Type Description Default
timeout timedelta

Request timeout duration.

timedelta(seconds=10)
component str

Name of a specific component to monitor. Monitors all components when empty.

''
Source code in health_check/contrib/atlassian.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@dataclasses.dataclass
class Vercel(AtlassianStatusPage):
    """
    Check Vercel platform status via Atlassian Status Page API v2.

    Args:
        timeout: Request timeout duration.
        component: Name of a specific component to monitor. Monitors all
            components when empty.

    """

    timeout: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(seconds=10), repr=False
    )
    base_url: str = dataclasses.field(
        default="https://www.vercel-status.com", init=False, repr=False
    )
    component: str = ""

Custom Status Page Feeds

These classes can be used to write custom status page proxy checks. Subclasses need to implement the required attributes as documented.

health_check.contrib.rss.Feed dataclass

Bases: HealthCheck

Base class for cloud provider status feed health checks.

Monitor cloud provider service health via their public RSS or Atom status feeds.

Subclasses must provide:

Attributes:

Name Type Description
feed_url str

The full URL of the RSS or Atom feed to monitor.

timeout timedelta

Maximum duration to wait for the HTTP request before failing.

max_age timedelta

Maximum age for an incident entry to be considered active.

The timeout and max_age values are used to control how long the health check waits for the feed and how far back in time incidents are considered relevant. The feed_url is used to fetch the status feed.

Examples:

>>> import dataclasses
>>> import datetime
>>> import typing
>>> from health_check.contrib.rss import Feed
>>> @dataclasses.dataclass
... class MyProviderStatus(Feed):
...     """Check MyProvider status via its public RSS feed."""
...     timeout: datetime.timedelta = dataclasses.field(
...         default=datetime.timedelta(seconds=10),
...         repr=False,
...     )
...     max_age: datetime.timedelta = dataclasses.field(
...         default=datetime.timedelta(hours=4),
...         repr=False,
...     )
...     feed_url: typing.ClassVar[str] = "https://status.myprovider.com/feed"
Source code in health_check/contrib/rss.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class Feed(HealthCheck):
    """
    Base class for cloud provider status feed health checks.

    Monitor cloud provider service health via their public RSS or Atom status feeds.

    Subclasses must provide:

    Attributes:
        feed_url: The full URL of the RSS or Atom feed to monitor.
        timeout: Maximum duration to wait for the HTTP request before failing.
        max_age: Maximum age for an incident entry to be considered active.

    The `timeout` and `max_age` values are used to control how long the
    health check waits for the feed and how far back in time incidents are
    considered relevant. The `feed_url` is used to fetch the status feed.

    Examples:
        >>> import dataclasses
        >>> import datetime
        >>> import typing
        >>> from health_check.contrib.rss import Feed
        >>> @dataclasses.dataclass
        ... class MyProviderStatus(Feed):
        ...     \"\"\"Check MyProvider status via its public RSS feed.\"\"\"
        ...     timeout: datetime.timedelta = dataclasses.field(
        ...         default=datetime.timedelta(seconds=10),
        ...         repr=False,
        ...     )
        ...     max_age: datetime.timedelta = dataclasses.field(
        ...         default=datetime.timedelta(hours=4),
        ...         repr=False,
        ...     )
        ...     feed_url: typing.ClassVar[str] = "https://status.myprovider.com/feed"

    """

    feed_url: str = NotImplemented
    timeout: datetime.timedelta = NotImplemented
    max_age: datetime.timedelta = NotImplemented

    async def run(self):
        logger.debug("Fetching feed from %s", self.feed_url)

        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(
                    self.feed_url,
                    headers={"User-Agent": f"django-health-check@{__version__}"},
                    timeout=self.timeout.total_seconds(),
                    follow_redirects=True,
                )
            except httpx.TimeoutException as e:
                raise ServiceUnavailable("Feed request timed out") from e
            except httpx.RequestError as e:
                raise ServiceUnavailable(f"Failed to fetch feed: {e}") from e

            try:
                response.raise_for_status()
            except httpx.HTTPStatusError as e:
                raise ServiceUnavailable(
                    f"HTTP error {e.response.status_code} fetching feed from {self.feed_url!r}"
                ) from e

            content = response.text

        feed = feedparser.parse(content)

        if feed.bozo:
            # feedparser sets bozo=1 for malformed feeds
            logger.warning("Feed parsing encountered errors: %s", feed.bozo_exception)

        if not feed.entries:
            logger.debug("No entries found in feed")
            return

        if incidents := list(self._recent_incidents(feed.entries)):
            raise StatusPageWarning(
                "\n".join(
                    f"{getattr(entry, 'title', 'Unknown Incident') or 'Unknown Incident'}:"
                    f" {getattr(entry, 'link', self.feed_url) or self.feed_url}"
                    for entry, _ in incidents
                ),
                timestamp=max(
                    filter(None, (date for _, date in incidents)), default=None
                ),
            )

        logger.debug("No recent incidents found in feed")

    def _recent_incidents(self, entries):
        """Yield recent (entry, timestamp) pairs from feed entries."""
        for entry in entries:
            date = self._extract_date(entry)
            if date is None or self._is_date_recent(date):
                yield entry, date

    def _is_date_recent(self, date):
        """Check if a timestamp falls within the configured max_age window."""
        now = datetime.datetime.now(tz=datetime.timezone.utc)
        return now >= date > now - self.max_age

    def _extract_date(self, entry):
        # feedparser normalizes both RSS and Atom dates to struct_time
        # Try published first, then updated
        for date_field in ["published_parsed", "updated_parsed"]:
            if date_tuple := getattr(entry, date_field, None):
                try:
                    # Convert struct_time to datetime
                    return datetime.datetime(
                        *date_tuple[:6], tzinfo=datetime.timezone.utc
                    )
                except (ValueError, TypeError):
                    logger.warning(
                        "Failed to parse date from entry %r for %r",
                        date_tuple,
                        self.feed_url,
                        exc_info=True,
                    )
        return None

health_check.contrib.atlassian.AtlassianStatusPage dataclass

Bases: HealthCheck

Base class for Atlassian status page health checks.

Monitor cloud provider service health via Atlassian Status Page API v2.

Each subclass should define the base_url for the specific status page and appropriate timeout value.

When component is non-empty, only incidents affecting that named component are reported. Use separate check instances to monitor multiple components independently.

Examples:

>>> import dataclasses
>>> import datetime
>>> from health_check.contrib.atlassian import AtlassianStatusPage
>>> @dataclasses.dataclass
... class FlyIo(AtlassianStatusPage):
...     timeout: datetime.timedelta = datetime.timedelta(seconds=10)
...     base_url: str = dataclasses.field(default="https://status.flyio.net", init=False, repr=False)
Source code in health_check/contrib/atlassian.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class AtlassianStatusPage(HealthCheck):
    """
    Base class for Atlassian status page health checks.

    Monitor cloud provider service health via Atlassian Status Page API v2.

    Each subclass should define the `base_url` for the specific status page
    and appropriate `timeout` value.

    When `component` is non-empty, only incidents affecting that named component are
    reported. Use separate check instances to monitor multiple components independently.

    Examples:
        >>> import dataclasses
        >>> import datetime
        >>> from health_check.contrib.atlassian import AtlassianStatusPage
        >>> @dataclasses.dataclass
        ... class FlyIo(AtlassianStatusPage):
        ...     timeout: datetime.timedelta = datetime.timedelta(seconds=10)
        ...     base_url: str = dataclasses.field(default="https://status.flyio.net", init=False, repr=False)

    """

    base_url: str = NotImplemented
    timeout: datetime.timedelta = NotImplemented
    component: str = ""

    async def run(self):
        if incidents := [i async for i in self._fetch_incidents()]:
            raise StatusPageWarning(
                "\n".join(msg for msg, _ in incidents),
                timestamp=max(ts for _, ts in incidents),
            )
        logger.debug("No incidents found")

    async def _fetch_incidents(self):
        api_url = f"{self.base_url}/api/v2/components.json"
        logger.debug("Fetching incidents from %r", api_url)

        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(
                    api_url,
                    headers={"User-Agent": f"django-health-check@{__version__}"},
                    timeout=self.timeout.total_seconds(),
                    follow_redirects=True,
                )
            except httpx.TimeoutException as e:
                raise ServiceUnavailable("API request timed out") from e
            except httpx.RequestError as e:
                raise ServiceUnavailable(f"Failed to fetch API: {e}") from e

            try:
                response.raise_for_status()
            except httpx.HTTPStatusError as e:
                raise ServiceUnavailable(
                    f"HTTP error {e.response.status_code} fetching API from {api_url!r}"
                ) from e

            try:
                data = response.json()
            except ValueError as e:
                raise ServiceUnavailable("Failed to parse JSON response") from e

        if self.component:
            components_by_name = {c["name"]: c for c in data["components"]}
            try:
                _ = components_by_name[self.component]
            except KeyError as e:
                raise ServiceUnavailable(
                    f"Component {self.component!r} not found"
                ) from e

        for incident in data.get("incidents", []):
            if (incident.get("status") not in ("resolved", "postmortem")) and (
                not self.component
                or any(
                    c["name"] == self.component for c in incident.get("components", [])
                )
            ):
                yield (
                    f"{incident['name']}: {incident['shortlink']}",
                    datetime.datetime.fromisoformat(
                        incident["updated_at"].replace("Z", "+00:00")
                    ),
                )