Skip to content

Real-time Transport Protocol (RTP)

voip.rtp

Real-time Transport Protocol (RTP) implementation of RFC 3550.

RTPCall dataclass

One call leg managed by the RTP multiplexer.

Associates a SIP dialog with the RealtimeTransportProtocol media stream. Subclass and override packet_received to process incoming media, and use send_packet to transmit outbound media.

The rtp and sip back-references allow the handler to send data back to the caller and to terminate the call via SIP BYE.

Subclass voip.audio.AudioCall for audio calls with codec negotiation, buffering, and decoding.

Attributes:

Name Type Description
rtp RealtimeTransportProtocol

Shared RTP multiplexer socket that delivers packets to this handler.

sip SessionInitiationProtocol

SIP session that answered this call (used for BYE etc.).

caller CallerID

Caller identifier as received in the SIP From header.

media MediaDescription

Negotiated SDP media description for this call leg.

srtp SRTPSession | None

Optional SRTP session for encrypting and decrypting media.

Source code in voip/rtp.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@dataclasses.dataclass
class RTPCall:
    """One call leg managed by the RTP multiplexer.

    Associates a SIP dialog with the `RealtimeTransportProtocol` media
    stream. Subclass and override `packet_received` to process incoming
    media, and use `send_packet` to transmit outbound media.

    The `rtp` and `sip` back-references allow the handler to send data
    back to the caller and to terminate the call via SIP BYE.

    Subclass `voip.audio.AudioCall` for audio calls with codec
    negotiation, buffering, and decoding.

    Attributes:
        rtp: Shared RTP multiplexer socket that delivers packets to this handler.
        sip: SIP session that answered this call (used for BYE etc.).
        caller: Caller identifier as received in the SIP From header.
        media: Negotiated SDP media description for this call leg.
        srtp: Optional SRTP session for encrypting and decrypting media.
    """

    rtp: RealtimeTransportProtocol
    sip: SessionInitiationProtocol
    media: MediaDescription
    caller: CallerID
    srtp: SRTPSession | None = None

    def packet_received(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
        """Handle a parsed RTP packet. Override in subclasses to process media.

        Args:
            packet: Parsed RTP packet.
            addr: Remote ``(host, port)`` the packet arrived from.
        """

    def send_packet(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
        """Serialize *packet* and send it via the shared RTP socket.

        Encrypts the packet with the call's SRTP session when one is set.

        Args:
            packet: RTP packet to send.
            addr: Destination ``(host, port)``.
        """
        data = bytes(packet)
        if self.srtp is not None:
            data = self.srtp.encrypt(data)
        self.rtp.send(data, addr)

    async def hang_up(self) -> None:
        """Terminate the call by sending a SIP BYE request.

        Raises:
            NotImplementedError: Not yet implemented; the call_id and remote
                SIP address need to be stored per call to make this work.
        """
        raise NotImplementedError("hang_up is not yet implemented")

    @classmethod
    def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
        """Negotiate a media codec from the remote SDP offer.

        Override in subclasses to implement codec selection. The SIP layer
        calls this before sending a 200 OK; if the method raises the exception
        propagates and the call is not answered.

        Args:
            remote_media: The SDP ``m=audio`` section from the remote INVITE.

        Returns:
            A `MediaDescription` with the chosen codec.

        Raises:
            NotImplementedError: When not overridden by a subclass.
        """
        raise NotImplementedError(
            f"{cls.__name__} does not implement negotiate_codec. "
            "Override this classmethod in a subclass (e.g. AudioCall) to "
            "support codec negotiation."
        )
hang_up() async

Terminate the call by sending a SIP BYE request.

Raises:

Type Description
NotImplementedError

Not yet implemented; the call_id and remote SIP address need to be stored per call to make this work.

Source code in voip/rtp.py
141
142
143
144
145
146
147
148
async def hang_up(self) -> None:
    """Terminate the call by sending a SIP BYE request.

    Raises:
        NotImplementedError: Not yet implemented; the call_id and remote
            SIP address need to be stored per call to make this work.
    """
    raise NotImplementedError("hang_up is not yet implemented")
negotiate_codec(remote_media) classmethod

Negotiate a media codec from the remote SDP offer.

Override in subclasses to implement codec selection. The SIP layer calls this before sending a 200 OK; if the method raises the exception propagates and the call is not answered.

Parameters:

Name Type Description Default
remote_media MediaDescription

The SDP m=audio section from the remote INVITE.

required

Returns:

Type Description
MediaDescription

A MediaDescription with the chosen codec.

Raises:

Type Description
NotImplementedError

When not overridden by a subclass.

Source code in voip/rtp.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@classmethod
def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
    """Negotiate a media codec from the remote SDP offer.

    Override in subclasses to implement codec selection. The SIP layer
    calls this before sending a 200 OK; if the method raises the exception
    propagates and the call is not answered.

    Args:
        remote_media: The SDP ``m=audio`` section from the remote INVITE.

    Returns:
        A `MediaDescription` with the chosen codec.

    Raises:
        NotImplementedError: When not overridden by a subclass.
    """
    raise NotImplementedError(
        f"{cls.__name__} does not implement negotiate_codec. "
        "Override this classmethod in a subclass (e.g. AudioCall) to "
        "support codec negotiation."
    )
packet_received(packet, addr)

Handle a parsed RTP packet. Override in subclasses to process media.

Parameters:

Name Type Description Default
packet RTPPacket

Parsed RTP packet.

required
addr tuple[str, int]

Remote (host, port) the packet arrived from.

required
Source code in voip/rtp.py
119
120
121
122
123
124
125
def packet_received(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
    """Handle a parsed RTP packet. Override in subclasses to process media.

    Args:
        packet: Parsed RTP packet.
        addr: Remote ``(host, port)`` the packet arrived from.
    """
send_packet(packet, addr)

Serialize packet and send it via the shared RTP socket.

Encrypts the packet with the call's SRTP session when one is set.

Parameters:

Name Type Description Default
packet RTPPacket

RTP packet to send.

required
addr tuple[str, int]

Destination (host, port).

required
Source code in voip/rtp.py
127
128
129
130
131
132
133
134
135
136
137
138
139
def send_packet(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
    """Serialize *packet* and send it via the shared RTP socket.

    Encrypts the packet with the call's SRTP session when one is set.

    Args:
        packet: RTP packet to send.
        addr: Destination ``(host, port)``.
    """
    data = bytes(packet)
    if self.srtp is not None:
        data = self.srtp.encrypt(data)
    self.rtp.send(data, addr)

RTPPacket dataclass

Bases: ByteSerializableObject

RTP data packet RFC 3550 §5.1.

Source code in voip/rtp.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@dataclasses.dataclass
class RTPPacket(ByteSerializableObject):
    """
    RTP data packet [RFC 3550 §5.1].

    [RFC 3550 §5.1]: https://datatracker.ietf.org/doc/html/rfc3550#section-5.1
    """

    payload_type: RTPPayloadType | int
    sequence_number: int
    timestamp: int
    ssrc: int
    payload: bytes
    header_size: typing.ClassVar[int] = 12

    @classmethod
    def parse(cls, data: bytes) -> RTPPacket:
        if len(data) < cls.header_size:
            raise ValueError(f"RTP packet too short: {len(data)} bytes")
        payload_type = data[1] & 0x7F
        sequence_number = (data[2] << 8) | data[3]
        timestamp = (data[4] << 24) | (data[5] << 16) | (data[6] << 8) | data[7]
        ssrc = (data[8] << 24) | (data[9] << 16) | (data[10] << 8) | data[11]
        return cls(
            payload_type=payload_type,
            sequence_number=sequence_number,
            timestamp=timestamp,
            ssrc=ssrc,
            payload=data[cls.header_size :],
        )

    def __bytes__(self) -> bytes:
        return (
            struct.pack(
                ">BBHII",
                0x80,  # V=2, P=0, X=0, CC=0
                self.payload_type,
                self.sequence_number & 0xFFFF,
                self.timestamp & 0xFFFFFFFF,
                self.ssrc,
            )
            + self.payload
        )

RTPPayloadType

Bases: IntEnum

Common RTP payload types, aligned with SDP media format identifiers.

Static payload types (0–95) are defined by RFC 3551. Dynamic payload types (96–127) are negotiated via SDP. Opus uses payload type 111 per RFC 7587.

Source code in voip/rtp.py
32
33
34
35
36
37
38
39
40
41
42
43
class RTPPayloadType(enum.IntEnum):
    """Common RTP payload types, aligned with SDP media format identifiers.

    Static payload types (0–95) are defined by RFC 3551.
    Dynamic payload types (96–127) are negotiated via SDP.
    Opus uses payload type 111 per RFC 7587.
    """

    PCMU = 0  # G.711 µ-law (RFC 3551)
    PCMA = 8  # G.711 A-law (RFC 3551)
    G722 = 9  # G.722 (RFC 3551)
    OPUS = 111  # RFC 7587 (dynamic)

RealtimeTransportProtocol dataclass

Bases: STUNProtocol

RTP multiplexer: routes incoming datagrams to per-call handlers (RFC 3550).

One instance manages multiple simultaneous calls on a single UDP socket. Register per-call Call handlers with register_call; each incoming datagram is dispatched to the matching handler's datagram_received method by remote source address.

Use addr=None in register_call as a wildcard catch-all for calls whose remote RTP address is not known in advance (no SDP in INVITE).

Source code in voip/rtp.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@dataclasses.dataclass(kw_only=True, slots=True)
class RealtimeTransportProtocol(STUNProtocol):
    """RTP multiplexer: routes incoming datagrams to per-call handlers (RFC 3550).

    One instance manages multiple simultaneous calls on a single UDP socket.
    Register per-call `Call` handlers with
    `register_call`; each incoming datagram is dispatched to the
    matching handler's `datagram_received` method by
    remote source address.

    Use ``addr=None`` in `register_call` as a wildcard catch-all for
    calls whose remote RTP address is not known in advance (no SDP in INVITE).
    """

    rtp_header_size: typing.ClassVar[int] = 12
    calls: dict[tuple[str, int] | None, RTPCall] = dataclasses.field(
        init=False, default_factory=dict
    )
    public_address: asyncio.Future[tuple[str, int]] = dataclasses.field(
        init=False, default_factory=asyncio.Future
    )

    def stun_connection_made(self, transport, addr):
        self.public_address.set_result(addr)

    def register_call(
        self,
        addr: tuple[str, int] | None,
        handler: RTPCall,
    ) -> None:
        """Register *handler* for RTP traffic arriving from *addr*.

        Use ``addr=None`` as a wildcard to handle traffic from any source that
        has no dedicated routing entry (useful when the caller's RTP address is
        not known in advance from the INVITE SDP).

        Args:
            addr: Remote ``(ip, port)`` as it will appear in incoming datagrams,
                or ``None`` to register a wildcard catch-all handler.
            handler: A `Call` instance whose
                `datagram_received` will be called for
                matching packets.
        """
        logger.info(
            json.dumps(
                {
                    "event": "rtp_call_registered",
                    "addr": list(addr) if addr else None,
                    "handler": type(handler).__name__,
                }
            ),
            extra={"addr": addr},
        )
        self.calls[addr] = handler

    def unregister_call(self, addr: tuple[str, int] | None) -> None:
        """Remove the handler registered for *addr*.

        Args:
            addr: The same key that was passed to `register_call`.
                Silently ignored when no handler is registered for *addr*.
        """
        if addr in self.calls:
            logger.info(
                json.dumps(
                    {
                        "event": "rtp_call_unregistered",
                        "addr": list(addr) if addr else None,
                    }
                ),
                extra={"addr": addr},
            )
            self.calls.pop(addr)

    def packet_received(self, data: bytes, addr: tuple[str, int]) -> None:
        """Route an incoming SRTP datagram to the matching per-call handler.

        Looks up *addr* in the call registry.  Falls back to the wildcard
        ``None`` handler when no exact match exists.  Drops the packet with a
        debug log when no handler is registered at all.

        When the matched handler carries an SRTP session the packet is
        authenticated and decrypted before being forwarded; packets that fail
        authentication are logged at WARNING level and discarded.
        """
        handler = self.calls.get(addr)
        if handler is None:
            handler = self.calls.get(None)
        if handler is not None:
            if handler.srtp is not None:
                decrypted = handler.srtp.decrypt(data)
                if decrypted is None:
                    logger.warning(
                        "SRTP authentication failed for packet from %s:%s, discarding",
                        addr[0],
                        addr[1],
                    )
                    return
                data = decrypted
            try:
                handler.packet_received(RTPPacket.parse(data), addr)
            except ValueError:
                logger.warning(
                    "Malformed RTP packet from %s:%s, discarding",
                    addr[0],
                    addr[1],
                )
        else:
            logger.debug(
                "No call handler registered for %s:%s, dropping RTP packet",
                addr[0],
                addr[1],
            )
packet_received(data, addr)

Route an incoming SRTP datagram to the matching per-call handler.

Looks up addr in the call registry. Falls back to the wildcard None handler when no exact match exists. Drops the packet with a debug log when no handler is registered at all.

When the matched handler carries an SRTP session the packet is authenticated and decrypted before being forwarded; packets that fail authentication are logged at WARNING level and discarded.

Source code in voip/rtp.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def packet_received(self, data: bytes, addr: tuple[str, int]) -> None:
    """Route an incoming SRTP datagram to the matching per-call handler.

    Looks up *addr* in the call registry.  Falls back to the wildcard
    ``None`` handler when no exact match exists.  Drops the packet with a
    debug log when no handler is registered at all.

    When the matched handler carries an SRTP session the packet is
    authenticated and decrypted before being forwarded; packets that fail
    authentication are logged at WARNING level and discarded.
    """
    handler = self.calls.get(addr)
    if handler is None:
        handler = self.calls.get(None)
    if handler is not None:
        if handler.srtp is not None:
            decrypted = handler.srtp.decrypt(data)
            if decrypted is None:
                logger.warning(
                    "SRTP authentication failed for packet from %s:%s, discarding",
                    addr[0],
                    addr[1],
                )
                return
            data = decrypted
        try:
            handler.packet_received(RTPPacket.parse(data), addr)
        except ValueError:
            logger.warning(
                "Malformed RTP packet from %s:%s, discarding",
                addr[0],
                addr[1],
            )
    else:
        logger.debug(
            "No call handler registered for %s:%s, dropping RTP packet",
            addr[0],
            addr[1],
        )
register_call(addr, handler)

Register handler for RTP traffic arriving from addr.

Use addr=None as a wildcard to handle traffic from any source that has no dedicated routing entry (useful when the caller's RTP address is not known in advance from the INVITE SDP).

Parameters:

Name Type Description Default
addr tuple[str, int] | None

Remote (ip, port) as it will appear in incoming datagrams, or None to register a wildcard catch-all handler.

required
handler RTPCall

A Call instance whose datagram_received will be called for matching packets.

required
Source code in voip/rtp.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def register_call(
    self,
    addr: tuple[str, int] | None,
    handler: RTPCall,
) -> None:
    """Register *handler* for RTP traffic arriving from *addr*.

    Use ``addr=None`` as a wildcard to handle traffic from any source that
    has no dedicated routing entry (useful when the caller's RTP address is
    not known in advance from the INVITE SDP).

    Args:
        addr: Remote ``(ip, port)`` as it will appear in incoming datagrams,
            or ``None`` to register a wildcard catch-all handler.
        handler: A `Call` instance whose
            `datagram_received` will be called for
            matching packets.
    """
    logger.info(
        json.dumps(
            {
                "event": "rtp_call_registered",
                "addr": list(addr) if addr else None,
                "handler": type(handler).__name__,
            }
        ),
        extra={"addr": addr},
    )
    self.calls[addr] = handler
unregister_call(addr)

Remove the handler registered for addr.

Parameters:

Name Type Description Default
addr tuple[str, int] | None

The same key that was passed to register_call. Silently ignored when no handler is registered for addr.

required
Source code in voip/rtp.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def unregister_call(self, addr: tuple[str, int] | None) -> None:
    """Remove the handler registered for *addr*.

    Args:
        addr: The same key that was passed to `register_call`.
            Silently ignored when no handler is registered for *addr*.
    """
    if addr in self.calls:
        logger.info(
            json.dumps(
                {
                    "event": "rtp_call_unregistered",
                    "addr": list(addr) if addr else None,
                }
            ),
            extra={"addr": addr},
        )
        self.calls.pop(addr)

Encryption

voip.srtp

Secure Real-time Transport Protocol (SRTP) implementation of RFC 3711.

Provides symmetric key encryption and authentication for RTP media streams using the AES_CM_128_HMAC_SHA1_80 cipher suite. Keys are negotiated via SDP Security Descriptions (SDES, RFC 4568) carried in the SIP 200 OK SDP body, which is itself protected by TLS.

Requires the cryptography package (included in any installation).

SRTPSession dataclass

SRTP session for one call leg using AES_CM_128_HMAC_SHA1_80.

Handles symmetric encryption and authentication of RTP packets. Key material is derived from master_key and master_salt via the SRTP pseudo-random function (RFC 3711 §4.3.1).

Create a fresh session for each answered call via generate and pass it to the RTPCall instance. The SDP a=crypto: attribute is produced by sdes_attribute.

Attributes:

Name Type Description
master_key bytes

16-byte AES master key (randomly generated).

master_salt bytes

14-byte master salt (randomly generated).

Source code in voip/srtp.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@dataclasses.dataclass(slots=True)
class SRTPSession:
    """SRTP session for one call leg using AES_CM_128_HMAC_SHA1_80.

    Handles symmetric encryption and authentication of RTP packets.
    Key material is derived from `master_key` and `master_salt` via the
    SRTP pseudo-random function (RFC 3711 §4.3.1).

    Create a fresh session for each answered call via `generate` and pass
    it to the `RTPCall` instance.  The SDP `a=crypto:` attribute is produced
    by `sdes_attribute`.

    Attributes:
        master_key: 16-byte AES master key (randomly generated).
        master_salt: 14-byte master salt (randomly generated).
    """

    master_key: bytes
    master_salt: bytes
    _session_key: bytes = dataclasses.field(init=False)
    _session_auth_key: bytes = dataclasses.field(init=False)
    _session_salt: bytes = dataclasses.field(init=False)
    #: Rollover counter and highest sent sequence number for encryption.
    _send_roc: int = dataclasses.field(init=False, default=0)
    _last_send_seq: int = dataclasses.field(init=False, default=-1)
    #: Rollover counter and highest received sequence number for decryption.
    _recv_roc: int = dataclasses.field(init=False, default=0)
    _last_recv_seq: int = dataclasses.field(init=False, default=-1)

    def __post_init__(self) -> None:
        self._session_key = _prf(self.master_key, 0x00, self.master_salt, _KEY_SIZE)
        self._session_auth_key = _prf(self.master_key, 0x01, self.master_salt, 20)
        self._session_salt = _prf(self.master_key, 0x02, self.master_salt, _SALT_SIZE)

    @classmethod
    def generate(cls) -> SRTPSession:
        """Generate a new SRTP session with a cryptographically random key and salt."""
        return cls(
            master_key=os.urandom(_KEY_SIZE),
            master_salt=os.urandom(_SALT_SIZE),
        )

    @property
    def sdes_attribute(self) -> str:
        """SDP `a=crypto:` attribute value for SDES key exchange (RFC 4568).

        The key material (master key followed by master salt) is base64-encoded
        and wrapped in the standard SDES inline format.  Include this value in
        the SDP `a=crypto:` attribute of the answered media description:

        ```
        a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:<value>
        ```
        """
        key_salt = base64.b64encode(self.master_key + self.master_salt).decode()
        return f"1 {CIPHER_SUITE} inline:{key_salt}"

    def _compute_iv(self, ssrc: int, index: int) -> bytes:
        """Compute the 128-bit AES-CM IV for a given SSRC and packet index.

        IV = (session_salt * 2^16) XOR (SSRC * 2^64) XOR (index * 2^16)
        per RFC 3711 §4.1.1.
        """
        iv_int = (
            (int.from_bytes(self._session_salt, "big") << 16)
            ^ (ssrc << 64)
            ^ (index << 16)
        )
        return iv_int.to_bytes(16, "big")

    def _auth_tag(self, packet_no_tag: bytes, roc: int) -> bytes:
        """Compute the 10-byte HMAC-SHA1 authentication tag (RFC 3711 §4.2)."""
        roc_bytes = struct.pack(">I", roc)
        mac = hmac.HMAC(self._session_auth_key, hashes.SHA1())  # noqa: S303
        mac.update(packet_no_tag + roc_bytes)
        return mac.finalize()[:_AUTH_TAG_SIZE]

    def _estimate_recv_index(self, seq: int) -> tuple[int, int]:
        """Estimate the packet index and new ROC for a received sequence number.

        Implements the index estimation algorithm from RFC 3711 §3.3.1.

        Args:
            seq: The 16-bit sequence number from the received RTP header.

        Returns:
            A `(index, roc_guess)` tuple where `index` is the estimated
            48-bit packet index and `roc_guess` is the ROC value used.
        """
        s_l = self._last_recv_seq
        roc = self._recv_roc
        if s_l < 0:
            # No packets received yet; use the current ROC.
            return (roc << 16) | seq, roc
        if s_l < 0x8000:  # s_l < 2^15
            if seq - s_l > 0x8000:
                roc_guess = (roc - 1) % (1 << 32)
            else:
                roc_guess = roc
        else:  # s_l >= 2^15
            if s_l - seq > 0x8000:
                roc_guess = (roc + 1) % (1 << 32)
            else:
                roc_guess = roc
        return (roc_guess << 16) | seq, roc_guess

    def encrypt(self, packet: bytes) -> bytes:
        """Encrypt an RTP packet to produce an SRTP packet.

        Encrypts the RTP payload with AES-CM and appends an 80-bit
        HMAC-SHA1 authentication tag.  Tracks sequence number rollover per
        RFC 3711 §3.3.1 to ensure the packet index remains unique.

        Args:
            packet: Raw RTP packet bytes (at least 12 bytes).

        Returns:
            SRTP packet with encrypted payload and appended auth tag.
        """
        if len(packet) < 12:
            return packet
        header = packet[:12]
        payload = packet[12:]
        ssrc = struct.unpack(">I", header[8:12])[0]
        seq = struct.unpack(">H", header[2:4])[0]

        # Detect rollover: the sequence number wrapped from ~65535 back to ~0.
        # For the send side, sequence numbers always increase monotonically so
        # any decrease indicates a rollover.
        if self._last_send_seq >= 0 and seq < self._last_send_seq:
            self._send_roc = (self._send_roc + 1) % (1 << 32)
        self._last_send_seq = seq

        index = (self._send_roc << 16) | seq
        iv = self._compute_iv(ssrc, index)
        cipher = Cipher(algorithms.AES(self._session_key), modes.CTR(iv))
        enc = cipher.encryptor()
        encrypted_payload = enc.update(payload) + enc.finalize()

        srtp_no_tag = header + encrypted_payload
        return srtp_no_tag + self._auth_tag(srtp_no_tag, self._send_roc)

    def decrypt(self, packet: bytes) -> bytes | None:
        """Decrypt and authenticate an SRTP packet.

        Verifies the HMAC-SHA1-80 authentication tag and, if valid, decrypts
        the payload with AES-CM.  The packet index is estimated per
        RFC 3711 §3.3.1, tracking rollovers across the 16-bit sequence space.

        Args:
            packet: Raw SRTP packet bytes (at least 12 + 10 bytes).

        Returns:
            Decrypted RTP packet bytes, or `None` when authentication fails
            or the packet is too short.
        """
        if len(packet) < 12 + _AUTH_TAG_SIZE:
            return None
        srtp_no_tag = packet[:-_AUTH_TAG_SIZE]
        received_tag = packet[-_AUTH_TAG_SIZE:]

        header = packet[:12]
        ssrc = struct.unpack(">I", header[8:12])[0]
        seq = struct.unpack(">H", header[2:4])[0]

        index, roc_guess = self._estimate_recv_index(seq)

        expected_tag = self._auth_tag(srtp_no_tag, roc_guess)
        if not _hmac_stdlib.compare_digest(received_tag, expected_tag):
            return None

        # Authentication passed — update the highest received sequence number
        # and ROC per RFC 3711 §3.3.1.
        if roc_guess == self._recv_roc:
            if self._last_recv_seq < 0 or seq > self._last_recv_seq:
                self._last_recv_seq = seq
        elif roc_guess == (self._recv_roc + 1) % (1 << 32):
            self._recv_roc = roc_guess
            self._last_recv_seq = seq

        encrypted_payload = srtp_no_tag[12:]
        iv = self._compute_iv(ssrc, index)
        cipher = Cipher(algorithms.AES(self._session_key), modes.CTR(iv))
        dec = cipher.decryptor()
        payload = dec.update(encrypted_payload) + dec.finalize()
        return header + payload
sdes_attribute property

SDP a=crypto: attribute value for SDES key exchange (RFC 4568).

The key material (master key followed by master salt) is base64-encoded and wrapped in the standard SDES inline format. Include this value in the SDP a=crypto: attribute of the answered media description:

a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:<value>
decrypt(packet)

Decrypt and authenticate an SRTP packet.

Verifies the HMAC-SHA1-80 authentication tag and, if valid, decrypts the payload with AES-CM. The packet index is estimated per RFC 3711 §3.3.1, tracking rollovers across the 16-bit sequence space.

Parameters:

Name Type Description Default
packet bytes

Raw SRTP packet bytes (at least 12 + 10 bytes).

required

Returns:

Type Description
bytes | None

Decrypted RTP packet bytes, or None when authentication fails

bytes | None

or the packet is too short.

Source code in voip/srtp.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def decrypt(self, packet: bytes) -> bytes | None:
    """Decrypt and authenticate an SRTP packet.

    Verifies the HMAC-SHA1-80 authentication tag and, if valid, decrypts
    the payload with AES-CM.  The packet index is estimated per
    RFC 3711 §3.3.1, tracking rollovers across the 16-bit sequence space.

    Args:
        packet: Raw SRTP packet bytes (at least 12 + 10 bytes).

    Returns:
        Decrypted RTP packet bytes, or `None` when authentication fails
        or the packet is too short.
    """
    if len(packet) < 12 + _AUTH_TAG_SIZE:
        return None
    srtp_no_tag = packet[:-_AUTH_TAG_SIZE]
    received_tag = packet[-_AUTH_TAG_SIZE:]

    header = packet[:12]
    ssrc = struct.unpack(">I", header[8:12])[0]
    seq = struct.unpack(">H", header[2:4])[0]

    index, roc_guess = self._estimate_recv_index(seq)

    expected_tag = self._auth_tag(srtp_no_tag, roc_guess)
    if not _hmac_stdlib.compare_digest(received_tag, expected_tag):
        return None

    # Authentication passed — update the highest received sequence number
    # and ROC per RFC 3711 §3.3.1.
    if roc_guess == self._recv_roc:
        if self._last_recv_seq < 0 or seq > self._last_recv_seq:
            self._last_recv_seq = seq
    elif roc_guess == (self._recv_roc + 1) % (1 << 32):
        self._recv_roc = roc_guess
        self._last_recv_seq = seq

    encrypted_payload = srtp_no_tag[12:]
    iv = self._compute_iv(ssrc, index)
    cipher = Cipher(algorithms.AES(self._session_key), modes.CTR(iv))
    dec = cipher.decryptor()
    payload = dec.update(encrypted_payload) + dec.finalize()
    return header + payload
encrypt(packet)

Encrypt an RTP packet to produce an SRTP packet.

Encrypts the RTP payload with AES-CM and appends an 80-bit HMAC-SHA1 authentication tag. Tracks sequence number rollover per RFC 3711 §3.3.1 to ensure the packet index remains unique.

Parameters:

Name Type Description Default
packet bytes

Raw RTP packet bytes (at least 12 bytes).

required

Returns:

Type Description
bytes

SRTP packet with encrypted payload and appended auth tag.

Source code in voip/srtp.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def encrypt(self, packet: bytes) -> bytes:
    """Encrypt an RTP packet to produce an SRTP packet.

    Encrypts the RTP payload with AES-CM and appends an 80-bit
    HMAC-SHA1 authentication tag.  Tracks sequence number rollover per
    RFC 3711 §3.3.1 to ensure the packet index remains unique.

    Args:
        packet: Raw RTP packet bytes (at least 12 bytes).

    Returns:
        SRTP packet with encrypted payload and appended auth tag.
    """
    if len(packet) < 12:
        return packet
    header = packet[:12]
    payload = packet[12:]
    ssrc = struct.unpack(">I", header[8:12])[0]
    seq = struct.unpack(">H", header[2:4])[0]

    # Detect rollover: the sequence number wrapped from ~65535 back to ~0.
    # For the send side, sequence numbers always increase monotonically so
    # any decrease indicates a rollover.
    if self._last_send_seq >= 0 and seq < self._last_send_seq:
        self._send_roc = (self._send_roc + 1) % (1 << 32)
    self._last_send_seq = seq

    index = (self._send_roc << 16) | seq
    iv = self._compute_iv(ssrc, index)
    cipher = Cipher(algorithms.AES(self._session_key), modes.CTR(iv))
    enc = cipher.encryptor()
    encrypted_payload = enc.update(payload) + enc.finalize()

    srtp_no_tag = header + encrypted_payload
    return srtp_no_tag + self._auth_tag(srtp_no_tag, self._send_roc)
generate() classmethod

Generate a new SRTP session with a cryptographically random key and salt.

Source code in voip/srtp.py
 96
 97
 98
 99
100
101
102
@classmethod
def generate(cls) -> SRTPSession:
    """Generate a new SRTP session with a cryptographically random key and salt."""
    return cls(
        master_key=os.urandom(_KEY_SIZE),
        master_salt=os.urandom(_SALT_SIZE),
    )

NAT Traversal

voip.stun

Session Traversal Utilities for NAT (STUN) implementation of RFC 5389.

STUNAttributeType

Bases: IntEnum

STUN attribute types (RFC 5389 §15).

Source code in voip/stun.py
27
28
29
30
31
class STUNAttributeType(enum.IntEnum):
    """STUN attribute types (RFC 5389 §15)."""

    MAPPED_ADDRESS = 0x0001
    XOR_MAPPED_ADDRESS = 0x0020

STUNMessageType

Bases: IntEnum

STUN message types (RFC 5389 §6).

Source code in voip/stun.py
20
21
22
23
24
class STUNMessageType(enum.IntEnum):
    """STUN message types (RFC 5389 §6)."""

    BINDING_REQUEST = 0x0001
    BINDING_SUCCESS_RESPONSE = 0x0101

STUNProtocol dataclass

Bases: DatagramProtocol

Protocol for demultiplexing STUN (RFC 5389/7983) from other traffic.

Use this as the base class for any protocol that shares a UDP socket with STUN. Incoming datagrams whose first byte is in [0, 3] (RFC 7983) are treated as STUN messages and routed to the STUN handler. All other datagrams are forwarded to packet_received.

When the socket is ready and the reachable address is known, stun_connection_made is called. If stun_server_address is None this happens synchronously from connection_made with the local socket address. If STUN is configured it is called from datagram_received when the Binding Response arrives, with the discovered public address. Subclasses only need to override stun_connection_made — no connection_made override is required:

class MyProtocol(STUNProtocol):
    def stun_connection_made(
        self,
        transport: asyncio.DatagramTransport,
        addr: tuple[str, int],
    ) -> None:
        # socket is ready; addr is the reachable (public or local) address
        ...

    def packet_received(self, data: bytes, addr: tuple[str, int]) -> None:
        process(data)
Source code in voip/stun.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@dataclasses.dataclass(kw_only=True, slots=True)
class STUNProtocol(asyncio.DatagramProtocol):
    """
    Protocol for demultiplexing STUN (RFC 5389/7983) from other traffic.

    Use this as the base class for any protocol that shares a UDP socket with
    STUN. Incoming datagrams whose first byte is in `[0, 3]` (RFC 7983) are
    treated as STUN messages and routed to the STUN handler. All other
    datagrams are forwarded to `packet_received`.

    When the socket is ready and the reachable address is known,
    `stun_connection_made` is called.  If `stun_server_address` is
    `None` this happens synchronously from `connection_made` with the
    local socket address.  If STUN is configured it is called from
    `datagram_received` when the Binding Response arrives, with the
    discovered public address.  Subclasses only need to override
    `stun_connection_made` — no `connection_made` override is
    required:

    ```python
    class MyProtocol(STUNProtocol):
        def stun_connection_made(
            self,
            transport: asyncio.DatagramTransport,
            addr: tuple[str, int],
        ) -> None:
            # socket is ready; addr is the reachable (public or local) address
            ...

        def packet_received(self, data: bytes, addr: tuple[str, int]) -> None:
            process(data)
    ```
    """

    stun_server_address: tuple[str, int] | None = ("stun.cloudflare.com", 3478)
    _stun_transaction_id: bytes = dataclasses.field(init=False, default=b"")
    transport: asyncio.DatagramTransport | None = dataclasses.field(
        init=False, default=None
    )

    def connection_made(self, transport: asyncio.DatagramTransport) -> None:
        self.transport = transport
        if self.stun_server_address is None:
            self.stun_connection_made(transport, transport.get_extra_info("sockname"))
        else:
            self._stun_transaction_id = uuid.uuid4().bytes[:12]
            self._send_stun_request()

    def stun_connection_made(
        self,
        transport: asyncio.DatagramTransport,
        addr: tuple[str, int],
    ) -> None:
        """Called when the socket is ready and the reachable address is known.

        When STUN is configured, *addr* is the **public** ``(ip, port)``
        discovered from the STUN Binding Response and this method is called
        by `datagram_received`.  When ``stun_server_address=None``,
        *addr* is the local socket address and this method is called
        synchronously from `connection_made`.

        Subclasses override this method to trigger protocol-specific
        initialisation once the socket is ready.

        Args:
            transport: The UDP transport bound to this protocol.
            addr: Reachable ``(host, port)`` — public when STUN is used,
                local otherwise.
        """  # noqa: D401

    def send(self, data: bytes, addr: tuple[str, int]) -> None:
        """Send a raw datagram through the shared UDP socket.

        Args:
            data: Raw bytes to transmit.
            addr: Destination ``(host, port)``.
        """
        if self.transport is not None:
            self.transport.sendto(data, addr)

    def close(self) -> None:
        """Close the underlying UDP transport."""
        if self.transport is not None:
            self.transport.close()

    def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
        if data and data[0] < 4:
            # RFC 7983: first byte in [0, 3] indicates a STUN packet.
            if (
                len(data) >= 20
                and self._stun_transaction_id
                and data[8:20] == self._stun_transaction_id
            ):
                self._parse_stun_response(data)
            return
        self.packet_received(data, addr)

    def connection_lost(self, exc: Exception | None) -> None:
        """Clear the internal transport reference on disconnect."""
        self.transport = None

    def error_received(self, exc: Exception) -> None:
        logger.warning("UDP transport error (ignored): %s", exc)

    def packet_received(self, data: bytes, addr: tuple[str, int]) -> None:
        """Override in subclasses to handle non-STUN datagrams.

        Args:
            data: Raw datagram payload (first byte ≥ 4, not a STUN packet).
            addr: Source ``(host, port)`` of the datagram.
        """

    def _send_stun_request(self) -> None:
        """Send a STUN Binding Request through the protocol's own transport.

        Sends the request through the transport bound to this protocol so the
        server observes the same NAT mapping as real traffic.  Responses are
        demultiplexed from normal datagrams via the RFC 7983 first-byte rule.
        """
        if self.transport is None or self.stun_server_address is None:
            return
        request = struct.pack(
            ">HHI12s",
            STUNMessageType.BINDING_REQUEST,
            0,
            MAGIC_COOKIE,
            self._stun_transaction_id,
        )
        logger.debug("Sending STUN Binding Request to %s:%s", *self.stun_server_address)
        self.transport.sendto(request, self.stun_server_address)

    def _parse_stun_response(self, data: bytes) -> None:
        """Parse a STUN Binding Success Response and invoke :meth:`stun_connection_made`."""
        if len(data) < 20:
            return
        message_type, _message_len, magic_cookie = struct.unpack(">HHI", data[:8])
        response_tid = data[8:20]
        if (
            magic_cookie != MAGIC_COOKIE
            or message_type != STUNMessageType.BINDING_SUCCESS_RESPONSE
            or response_tid != self._stun_transaction_id
        ):
            return
        # Clear transaction ID so duplicate responses are ignored.
        self._stun_transaction_id = b""
        offset = 20
        xor_mapped: tuple[str, int] | None = None
        mapped: tuple[str, int] | None = None
        while offset + 4 <= len(data):
            attribute_type, attribute_len = struct.unpack(
                ">HH", data[offset : offset + 4]
            )
            attribute_value = data[offset + 4 : offset + 4 + attribute_len]
            if (
                attribute_type == STUNAttributeType.XOR_MAPPED_ADDRESS
                and len(attribute_value) >= 8
                and attribute_value[1] == 0x01  # IPv4
            ):
                port = struct.unpack(">H", attribute_value[2:4])[0] ^ (
                    MAGIC_COOKIE >> 16
                )
                ip_int = struct.unpack(">I", attribute_value[4:8])[0] ^ MAGIC_COOKIE
                xor_mapped = (socket.inet_ntoa(struct.pack(">I", ip_int)), port)
            elif (
                attribute_type == STUNAttributeType.MAPPED_ADDRESS
                and len(attribute_value) >= 8
                and attribute_value[1] == 0x01  # IPv4
            ):
                port = struct.unpack(">H", attribute_value[2:4])[0]
                mapped = (socket.inet_ntoa(attribute_value[4:8]), port)
            offset += 4 + ((attribute_len + 3) & ~3)  # 4-byte aligned
        result = xor_mapped or mapped
        if result:
            logger.debug("STUN response: %s:%s", *result)
            assert self.transport is not None
            self.stun_connection_made(self.transport, result)
        else:
            logger.error("No address attribute in STUN response")
close()

Close the underlying UDP transport.

Source code in voip/stun.py
114
115
116
117
def close(self) -> None:
    """Close the underlying UDP transport."""
    if self.transport is not None:
        self.transport.close()
connection_lost(exc)

Clear the internal transport reference on disconnect.

Source code in voip/stun.py
131
132
133
def connection_lost(self, exc: Exception | None) -> None:
    """Clear the internal transport reference on disconnect."""
    self.transport = None
packet_received(data, addr)

Override in subclasses to handle non-STUN datagrams.

Parameters:

Name Type Description Default
data bytes

Raw datagram payload (first byte ≥ 4, not a STUN packet).

required
addr tuple[str, int]

Source (host, port) of the datagram.

required
Source code in voip/stun.py
138
139
140
141
142
143
144
def packet_received(self, data: bytes, addr: tuple[str, int]) -> None:
    """Override in subclasses to handle non-STUN datagrams.

    Args:
        data: Raw datagram payload (first byte ≥ 4, not a STUN packet).
        addr: Source ``(host, port)`` of the datagram.
    """
send(data, addr)

Send a raw datagram through the shared UDP socket.

Parameters:

Name Type Description Default
data bytes

Raw bytes to transmit.

required
addr tuple[str, int]

Destination (host, port).

required
Source code in voip/stun.py
104
105
106
107
108
109
110
111
112
def send(self, data: bytes, addr: tuple[str, int]) -> None:
    """Send a raw datagram through the shared UDP socket.

    Args:
        data: Raw bytes to transmit.
        addr: Destination ``(host, port)``.
    """
    if self.transport is not None:
        self.transport.sendto(data, addr)
stun_connection_made(transport, addr)

Called when the socket is ready and the reachable address is known.

When STUN is configured, addr is the public (ip, port) discovered from the STUN Binding Response and this method is called by datagram_received. When stun_server_address=None, addr is the local socket address and this method is called synchronously from connection_made.

Subclasses override this method to trigger protocol-specific initialisation once the socket is ready.

Parameters:

Name Type Description Default
transport DatagramTransport

The UDP transport bound to this protocol.

required
addr tuple[str, int]

Reachable (host, port) — public when STUN is used, local otherwise.

required
Source code in voip/stun.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def stun_connection_made(
    self,
    transport: asyncio.DatagramTransport,
    addr: tuple[str, int],
) -> None:
    """Called when the socket is ready and the reachable address is known.

    When STUN is configured, *addr* is the **public** ``(ip, port)``
    discovered from the STUN Binding Response and this method is called
    by `datagram_received`.  When ``stun_server_address=None``,
    *addr* is the local socket address and this method is called
    synchronously from `connection_made`.

    Subclasses override this method to trigger protocol-specific
    initialisation once the socket is ready.

    Args:
        transport: The UDP transport bound to this protocol.
        addr: Reachable ``(host, port)`` — public when STUN is used,
            local otherwise.
    """  # noqa: D401