Skip to content

Session Initiation Protocol (SIP)

voip.sip.Dialog dataclass

Peer-to-peer SIP relationship between two user agents RFC 3261 §12.

Subclass Dialog to implement call handling. Set the subclass as dialog_class on the SIP session for inbound calls:

class MyDialog(Dialog):
    def call_received(self) -> None:
        self.ringing()
        self.answer(session_class=MyCall)

class MySession(SessionInitiationProtocol):
    dialog_class = MyDialog

For outbound calls:

dialog = Dialog(sip=my_sip_session)
await dialog.dial("sip:bob@biloxi.com", session_class=MyCall)

Parameters:

Name Type Description Default
sip SessionInitiationProtocol | None

The parent protocol the session belongs to.

None
Source code in voip/sip/dialog.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@dataclasses.dataclass(kw_only=True, slots=True)
class Dialog:
    """Peer-to-peer SIP relationship between two user agents [RFC 3261 §12].

    Subclass `Dialog` to implement call handling.  Set the subclass as
    `dialog_class` on the SIP session for inbound calls:

    ```python
    class MyDialog(Dialog):
        def call_received(self) -> None:
            self.ringing()
            self.answer(session_class=MyCall)

    class MySession(SessionInitiationProtocol):
        dialog_class = MyDialog
    ```

    For outbound calls:

    ```python
    dialog = Dialog(sip=my_sip_session)
    await dialog.dial("sip:bob@biloxi.com", session_class=MyCall)
    ```


    [RFC 3261 §12]: https://datatracker.ietf.org/doc/html/rfc3261#section-12

    Args:
        sip: The parent protocol the session belongs to.
    """

    # https://datatracker.ietf.org/doc/html/rfc3261#section-17.1.1
    T1: typing.ClassVar[datetime.timedelta] = datetime.timedelta(milliseconds=500)

    # https://datatracker.ietf.org/doc/html/rfc3261#section-17.1.2
    BYE_ACK_TIMEOUT: typing.ClassVar[datetime.timedelta] = 64 * T1

    uac: SipURI = None
    call_id: str = dataclasses.field(
        default_factory=lambda: f"{uuid.uuid4()}@{socket.gethostname()}",
        compare=False,
    )
    local_tag: str = dataclasses.field(
        default_factory=lambda: str(uuid.uuid4()), compare=True
    )
    remote_tag: str | None = dataclasses.field(default=None, compare=True)
    remote_contact: SipURI | None = dataclasses.field(default=None, compare=True)
    route_set: list[SipURI] = dataclasses.field(default_factory=list)
    local_party: str | None = dataclasses.field(default=None, compare=False)
    remote_party: str | None = dataclasses.field(default=None, compare=False)
    outbound_cseq: int = dataclasses.field(default=1, compare=False)
    sip: transactions.SessionInitiationProtocol | None = dataclasses.field(
        default=None, compare=False, repr=False
    )
    invite_transaction: transactions.InviteTransaction | None = dataclasses.field(
        default=None, compare=False, repr=False
    )

    session: Session | None = dataclasses.field(default=None, init=False, compare=False)
    created: datetime.datetime = dataclasses.field(
        init=False, default_factory=datetime.datetime.now
    )

    @property
    def from_header(self) -> str:
        """The logical sender of a request."""
        return f"{self.uac.scheme}:{self.uac.user}@{self.uac.host};tag={self.local_tag}"

    @property
    def to_header(self) -> str:
        """The logical recipient of a request."""
        part = f"{self.uac.scheme}:{self.uac.user}@{self.uac.host}:{self.uac.port};transport={self.uac.parameters.get('transport', 'TLS')}"
        if self.remote_tag:
            part += f";tag={self.remote_tag}"
        return part

    @property
    def headers(self) -> dict[str, str]:
        """Return a dict of headers for this dialog."""
        return {
            "From": self.from_header,
            "To": self.to_header,
            "Call-ID": self.call_id,
        }

    def call_received(self) -> None:
        """
        Called when an INVITE is received from the remote party.

        Override in subclasses to [answer][voip.sip.Dialog.answer],
        [ring][voip.sip.Dialog.ringing],
        or [reject][voip.sip.Dialog.reject] the call.

        The base implementation rejects with a busy signal.
        """  # noqa: D401
        self.reject()

    def hangup_received(self) -> None:
        """
        Called when the remote party sends a BYE.

        Override in subclasses to perform teardown.
        """  # noqa: D401

    def ringing(self) -> None:
        """
        Send a ringing signal to the remote party.

        This is optional but recommended for good user experience.
        If not called, the caller will hear silence until the call is accepted or rejected.
        """
        if self.invite_transaction is not None:
            self.invite_transaction.ringing()

    def answer(
        self, *, session_class: type[Session], **session_kwargs: typing.Any
    ) -> None:
        """
        Accept the inbound call and start a multimedia session.

        Args:
            session_class: Session subclass to create for this call.
            **session_kwargs: Extra keyword arguments forwarded to `session_class`.
        """
        if self.invite_transaction is not None:
            self.invite_transaction.answer(
                session_class=session_class, **session_kwargs
            )

    def reject(self, status_code: types.SIPStatus = types.SIPStatus.BUSY_HERE) -> None:
        """
        Reject the inbound call with the given status code.

        Common status codes include:
            - [BUSY_HERE][voip.sip.types.SIPStatus.BUSY_HERE]: The remote party will hear a busy signal.
            - [DECLINE][voip.sip.types.SIPStatus.DECLINE]: The remote party will hear a decline signal.
            - [DOES_NOT_EXIST_ANYWHERE][voip.sip.types.SIPStatus.DOES_NOT_EXIST_ANYWHERE]: The remote party will hear a "The person you are trying to reach…" message.

        Args:
            status_code: SIP response status code (default: 486 Busy Here).
        """
        if self.invite_transaction is not None:
            self.invite_transaction.reject(status_code)

    async def bye(self) -> None:
        """End the call and terminate the dialog and multimedia session."""
        from voip.sip.transactions import ByeTransaction  # noqa: PLC0415

        try:
            await asyncio.wait_for(
                ByeTransaction.send(sip=self.sip, dialog=self),
                timeout=self.BYE_ACK_TIMEOUT.total_seconds(),
            )
        except TimeoutError:
            logger.warning(
                "BYE for dialog %s was not acknowledged within %r",
                self.call_id,
                self.BYE_ACK_TIMEOUT,
            )
        self.sip.drop_dialog(self)

    async def dial(
        self,
        target: SipURI,
        *,
        session_class: type[Session],
        **session_kwargs: typing.Any,
    ) -> None:
        """
        Initiate an outbound call to *target*.

        Args:
            target: SIP or tel URI of the remote party (e.g. ``"sip:+15551234567@carrier.com"`` or ``"tel:+15551234567"``).
            session_class: Session subclass to create for this call.
            **session_kwargs: Extra keyword arguments forwarded to `session_class`.

        [RFC 3261 §13.1]: https://datatracker.ietf.org/doc/html/rfc3261#section-13.1
        """
        from voip.sip.transactions import InviteTransaction  # noqa: PLC0415

        await InviteTransaction.send(
            sip=self.sip,
            target=target,
            dialog=self,
            session_class=session_class,
            **session_kwargs,
        )

    @classmethod
    def from_request(cls, request: messages.Request, **kwargs) -> Dialog:
        """Create a dialog from a request, extracting relevant headers."""
        return cls(
            call_id=request.headers["Call-ID"],
            local_tag=request.remote_tag or str(uuid.uuid4()),
            remote_tag=request.local_tag,
            remote_contact=request.headers.get("Contact"),
            **kwargs,
        )

call_received()

Called when an INVITE is received from the remote party.

Override in subclasses to answer, ring, or reject the call.

The base implementation rejects with a busy signal.

Source code in voip/sip/dialog.py
103
104
105
106
107
108
109
110
111
112
113
def call_received(self) -> None:
    """
    Called when an INVITE is received from the remote party.

    Override in subclasses to [answer][voip.sip.Dialog.answer],
    [ring][voip.sip.Dialog.ringing],
    or [reject][voip.sip.Dialog.reject] the call.

    The base implementation rejects with a busy signal.
    """  # noqa: D401
    self.reject()

hangup_received()

Called when the remote party sends a BYE.

Override in subclasses to perform teardown.

Source code in voip/sip/dialog.py
115
116
117
118
119
120
def hangup_received(self) -> None:
    """
    Called when the remote party sends a BYE.

    Override in subclasses to perform teardown.
    """  # noqa: D401

ringing()

Send a ringing signal to the remote party.

This is optional but recommended for good user experience. If not called, the caller will hear silence until the call is accepted or rejected.

Source code in voip/sip/dialog.py
122
123
124
125
126
127
128
129
130
def ringing(self) -> None:
    """
    Send a ringing signal to the remote party.

    This is optional but recommended for good user experience.
    If not called, the caller will hear silence until the call is accepted or rejected.
    """
    if self.invite_transaction is not None:
        self.invite_transaction.ringing()

answer(*, session_class, **session_kwargs)

Accept the inbound call and start a multimedia session.

Parameters:

Name Type Description Default
session_class type[Session]

Session subclass to create for this call.

required
**session_kwargs Any

Extra keyword arguments forwarded to session_class.

{}
Source code in voip/sip/dialog.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def answer(
    self, *, session_class: type[Session], **session_kwargs: typing.Any
) -> None:
    """
    Accept the inbound call and start a multimedia session.

    Args:
        session_class: Session subclass to create for this call.
        **session_kwargs: Extra keyword arguments forwarded to `session_class`.
    """
    if self.invite_transaction is not None:
        self.invite_transaction.answer(
            session_class=session_class, **session_kwargs
        )

reject(status_code=types.SIPStatus.BUSY_HERE)

Reject the inbound call with the given status code.

Common status codes include
  • BUSY_HERE: The remote party will hear a busy signal.
  • DECLINE: The remote party will hear a decline signal.
  • DOES_NOT_EXIST_ANYWHERE: The remote party will hear a "The person you are trying to reach…" message.

Parameters:

Name Type Description Default
status_code SIPStatus

SIP response status code (default: 486 Busy Here).

BUSY_HERE
Source code in voip/sip/dialog.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def reject(self, status_code: types.SIPStatus = types.SIPStatus.BUSY_HERE) -> None:
    """
    Reject the inbound call with the given status code.

    Common status codes include:
        - [BUSY_HERE][voip.sip.types.SIPStatus.BUSY_HERE]: The remote party will hear a busy signal.
        - [DECLINE][voip.sip.types.SIPStatus.DECLINE]: The remote party will hear a decline signal.
        - [DOES_NOT_EXIST_ANYWHERE][voip.sip.types.SIPStatus.DOES_NOT_EXIST_ANYWHERE]: The remote party will hear a "The person you are trying to reach…" message.

    Args:
        status_code: SIP response status code (default: 486 Busy Here).
    """
    if self.invite_transaction is not None:
        self.invite_transaction.reject(status_code)

dial(target, *, session_class, **session_kwargs) async

Initiate an outbound call to target.

Parameters:

Name Type Description Default
target SipURI

SIP or tel URI of the remote party (e.g. "sip:+15551234567@carrier.com" or "tel:+15551234567").

required
session_class type[Session]

Session subclass to create for this call.

required
**session_kwargs Any

Extra keyword arguments forwarded to session_class.

{}
Source code in voip/sip/dialog.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def dial(
    self,
    target: SipURI,
    *,
    session_class: type[Session],
    **session_kwargs: typing.Any,
) -> None:
    """
    Initiate an outbound call to *target*.

    Args:
        target: SIP or tel URI of the remote party (e.g. ``"sip:+15551234567@carrier.com"`` or ``"tel:+15551234567"``).
        session_class: Session subclass to create for this call.
        **session_kwargs: Extra keyword arguments forwarded to `session_class`.

    [RFC 3261 §13.1]: https://datatracker.ietf.org/doc/html/rfc3261#section-13.1
    """
    from voip.sip.transactions import InviteTransaction  # noqa: PLC0415

    await InviteTransaction.send(
        sip=self.sip,
        target=target,
        dialog=self,
        session_class=session_class,
        **session_kwargs,
    )

bye() async

End the call and terminate the dialog and multimedia session.

Source code in voip/sip/dialog.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
async def bye(self) -> None:
    """End the call and terminate the dialog and multimedia session."""
    from voip.sip.transactions import ByeTransaction  # noqa: PLC0415

    try:
        await asyncio.wait_for(
            ByeTransaction.send(sip=self.sip, dialog=self),
            timeout=self.BYE_ACK_TIMEOUT.total_seconds(),
        )
    except TimeoutError:
        logger.warning(
            "BYE for dialog %s was not acknowledged within %r",
            self.call_id,
            self.BYE_ACK_TIMEOUT,
        )
    self.sip.drop_dialog(self)

voip.sip.SessionInitiationProtocol dataclass

Bases: Protocol

SIP User Agent Client (UAC) over TLS/TCP [RFC 3261].

Handles SIP message parsing, carrier registration, and transaction management.

Example

You can use the handler like any asyncio.Protocol in Python.

import asyncio

from voip.sip import SessionInitiationProtocol

async def main():
    loop = asyncio.get_running_loop()

    transport, protocol = await loop.create_connection(
        SessionInitiationProtocol,
        '0.0.0.0', 5060)

    try:
        await asyncio.Future()
    finally:
        transport.close()


asyncio.run(main())

However, this example is incomplete, since the protocol will require some arguments, like a reference to the RTP protocol and an AOR.

Note

The support is limited to UAC (client mode). This library currently does not implement server (UAS) functionality.

Parameters:

Name Type Description Default
aor SipURI

SIP Address of Record (AOR) to register with the carrier.

required
rtp RealtimeTransportProtocol

Shared RTP mux for call media.

required
dialog_class type[Dialog]

Dialog subclass used to create dialogs for incoming calls. Defaults to the base Dialog which rejects all calls with 486 Busy Here.

Dialog
keepalive_interval timedelta

Keep-alive ping interval. Should be between 30 and 90 seconds.

timedelta(seconds=30)
Source code in voip/sip/protocol.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
@dataclasses.dataclass(kw_only=True, slots=True)
class SessionInitiationProtocol(asyncio.Protocol):
    """
    SIP User Agent Client (UAC) over TLS/TCP [RFC 3261].

    Handles SIP message parsing, carrier registration, and transaction management.

    Example:
        You can use the handler like any [asyncio.Protocol][asyncio.Protocol] in Python.

        ```python
        import asyncio

        from voip.sip import SessionInitiationProtocol

        async def main():
            loop = asyncio.get_running_loop()

            transport, protocol = await loop.create_connection(
                SessionInitiationProtocol,
                '0.0.0.0', 5060)

            try:
                await asyncio.Future()
            finally:
                transport.close()


        asyncio.run(main())
        ```

        However, this example is incomplete, since the protocol will require some
        arguments, like a reference to the RTP protocol and an AOR.

    > [!Note]
    > The support is limited to UAC (client mode).
    > This library currently does not implement server (UAS) functionality.

    [RFC 3261]: https://datatracker.ietf.org/doc/html/rfc3261

    Args:
        aor: SIP Address of Record (AOR) to register with the carrier.
        rtp: Shared RTP mux for call media.
        dialog_class: [Dialog][voip.sip.Dialog] subclass used to
            create dialogs for incoming calls.  Defaults to the base
            [Dialog][voip.sip.Dialog] which rejects all calls with
            ``486 Busy Here``.
        keepalive_interval: Keep-alive ping interval. Should be between 30 and 90 seconds.

    """

    aor: types.SipURI
    rtp: RealtimeTransportProtocol
    dialog_class: type[Dialog] = dataclasses.field(default=Dialog)
    keepalive_interval: datetime.timedelta = datetime.timedelta(seconds=30)

    keepalive_task: asyncio.Task | None = dataclasses.field(init=False, default=None)
    public_address: NetworkAddress = None
    _dialogs: dict[tuple[str, str], Dialog] = dataclasses.field(
        init=False, default_factory=dict
    )
    _transactions: dict[str, Transaction] = dataclasses.field(
        init=False, default_factory=dict
    )
    disconnected_event: asyncio.Event = dataclasses.field(
        init=False, default_factory=asyncio.Event
    )
    registered_event: asyncio.Event = dataclasses.field(
        init=False, default_factory=asyncio.Event
    )
    transport: asyncio.Transport | None = dataclasses.field(init=False, default=None)
    is_secure: bool = dataclasses.field(init=False, default=False)
    recv_buffer: bytearray = dataclasses.field(init=False, default_factory=bytearray)
    ready_callback: typing.Callable[[], None] | None = dataclasses.field(
        default=None, repr=False, compare=False
    )

    def __post_init__(self):
        self.public_address = self.public_address or self.rtp.public_address

    @classmethod
    async def run(
        cls,
        fn: typing.Callable[[], None],
        aor: types.SipURI,
        dialog_class: type[Dialog],
        *,
        no_verify_tls: bool = False,
        stun_server: NetworkAddress | None = None,
    ) -> SessionInitiationProtocol:
        """Run a SIP session and call *fn* once registered.

        Establishes RTP and SIP/TLS connections derived from *aor*, then
        **suspends until SIP registration is confirmed** before returning the
        ready protocol.  After this call returns, the MCP server (or any other
        caller) may safely place outbound calls.

        The transport protocol (TLS vs plain TCP) and proxy address are read
        from *aor* directly — no extra arguments are needed.

        Args:
            fn: Called when the SIP session is registered, before
                `run` returns. Receives no arguments. May use
                [`asyncio.create_task`][] for async work.
            aor: SIP Address of Record, e.g. ``sip:alice@carrier.example``.
                The host, port, and ``transport`` parameter are used to connect
                to the SIP proxy.
            dialog_class: [`Dialog`][voip.sip.Dialog] subclass used for
                inbound calls. Defaults to the base
                [`Dialog`][voip.sip.Dialog], which rejects all calls.
            no_verify_tls: Disable TLS certificate verification. Insecure; for
                testing only. Defaults to ``False``.
            stun_server: STUN server for RTP NAT traversal. Defaults to
                ``stun.cloudflare.com:3478``.

        Returns:
            The registered [`SessionInitiationProtocol`][voip.sip.protocol.SessionInitiationProtocol]
            instance, ready to place calls.
        """
        loop = asyncio.get_running_loop()

        rtp_bind_address = (
            "::" if isinstance(aor.maddr[0], ipaddress.IPv6Address) else "0.0.0.0"
        )  # noqa: S104
        _, rtp_protocol = await loop.create_datagram_endpoint(
            lambda: RealtimeTransportProtocol(stun_server_address=stun_server),
            local_addr=(rtp_bind_address, 0),
        )

        ssl_context: ssl.SSLContext | None = None
        if aor.transport == "TLS":
            ssl_context = ssl.create_default_context()
            if no_verify_tls:
                ssl_context.check_hostname = False
                ssl_context.verify_mode = ssl.CERT_NONE

        _, protocol = await loop.create_connection(
            lambda: cls(
                aor=aor, rtp=rtp_protocol, dialog_class=dialog_class, ready_callback=fn
            ),
            host=str(aor.maddr[0]),
            port=aor.maddr[1],
            ssl=ssl_context,
        )
        await protocol.registered_event.wait()
        return protocol

    def register_dialog(self, dialog: Dialog) -> None:
        """Register *dialog* keyed by ``(dialog.local_tag, dialog.remote_tag)``."""
        if dialog.remote_tag is None:
            logger.warning("Dialog without remote tag cannot be registered: %r", dialog)
        else:
            self._dialogs[dialog.local_tag, dialog.remote_tag] = dialog

    def drop_dialog(self, dialog: Dialog) -> None:
        """Remove *dialog* from the registry."""
        if dialog.remote_tag is None:
            logger.warning("Dialog without remote tag cannot be removed: %r", dialog)
        else:
            try:
                del self._dialogs[dialog.local_tag, dialog.remote_tag]
            except KeyError:
                logger.warning("Dialog not found for removal: %r", dialog)

    def register_transaction(self, tx: Transaction) -> None:
        """Register *tx* by its branch parameter."""
        self._transactions[tx.branch] = tx

    def drop_transaction(self, tx: Transaction) -> None:
        """Remove *tx* from the registry."""
        try:
            del self._transactions[tx.branch]
        except KeyError:
            logger.warning("Transaction not found for removal: %r", tx)

    def connection_made(self, transport: asyncio.Transport) -> None:  # type: ignore[override]
        """Store the TLS/TCP transport and start RTP mux + carrier registration."""
        self.transport = transport
        self.is_secure = transport.get_extra_info("ssl_object") is not None
        try:
            loop = asyncio.get_running_loop()
            tx = RegistrationTransaction(sip=self, method=SIPMethod.REGISTER)
            self.register_transaction(tx)
            loop.create_task(self.handle_registration(tx))
            self.keepalive_task = loop.create_task(self.send_keepalive())
        except RuntimeError:
            pass  # no running loop in synchronous test setups

    async def send_keepalive(self) -> None:
        while True:
            await asyncio.sleep(self.keepalive_interval.total_seconds())
            if self.transport is None:
                return
            logger.info("PING", extra={"addr": self.public_address})
            self.transport.write(PING)

    async def handle_registration(self, tx: RegistrationTransaction) -> None:
        await tx
        self.on_registered()

    def data_received(self, data: bytes) -> None:
        self.recv_buffer.extend(data)
        for frame in self._extract_frames():
            self._dispatch_frame(frame)

    def _extract_frames(self) -> typing.Generator[memoryview | bytes]:  # noqa: C901
        while self.recv_buffer:
            if self.recv_buffer[0:1] != b"\r":
                # SIP message: wait for the header-body separator.
                header_end = self.recv_buffer.find(b"\r\n\r\n")
                if header_end == -1:
                    break  # incomplete headers – wait for more data
                content_length = 0
                for line in self.recv_buffer[:header_end].split(b"\r\n")[1:]:
                    name, sep, value = line.partition(b":")
                    if sep and name.strip().lower() == b"content-length":
                        try:
                            content_length = int(value.strip())
                        except ValueError:
                            pass
                        break
                message_end = header_end + 4 + content_length
                if len(self.recv_buffer) < message_end:
                    break  # incomplete body – wait for more data
                frame = memoryview(self.recv_buffer)[:message_end]
                yield frame
                frame.release()
                del self.recv_buffer[:message_end]
            elif len(self.recv_buffer) >= 4 and self.recv_buffer[:4] == PING:
                yield PING
                del self.recv_buffer[:4]
            elif len(self.recv_buffer) >= 3 and self.recv_buffer[2:3] == b"\r":
                # Third byte is CR – could be the start of PING; wait for 4th byte.
                break
            elif self.recv_buffer[:2] == PONG:
                yield PONG
                del self.recv_buffer[:2]
            else:
                # Single CR or other incomplete sequence – wait for more data.
                break

    def _dispatch_frame(self, frame: memoryview | bytes) -> None:
        peer = NetworkAddress(*self.transport.get_extra_info("peername"))
        if frame == PONG:
            logger.info("PONG", extra={"addr": peer})
        elif frame == PING:
            logger.info("PING", extra={"addr": peer})
            if self.transport:
                logger.info("PONG", extra={"addr": self.public_address})
                self.transport.write(PONG)
        else:
            match Message.parse(bytes(frame)):
                case Request() as request:
                    logger.info(
                        "Request received: %r",
                        request,
                        extra={"addr": peer},
                    )
                    self.request_received(request)
                case Response() as response:
                    logger.info(
                        "Response received %r",
                        response,
                        extra={"addr": peer},
                    )
                    self.response_received(response)

    def send(self, message: Response | Request) -> None:
        """Serialize and send a SIP message over the TLS/TCP connection."""
        logger.debug("Sending %r", message)
        message.headers.setdefault("User-Agent", USER_AGENT)
        if self.transport is not None:
            self.transport.write(bytes(message))

    def close(self) -> None:
        """Close the TLS/TCP transport and the RTP mux."""
        if self.transport is not None:
            self.transport.close()

    @property
    def allowed_methods(self) -> frozenset[SIPMethod]:
        """SIP methods supported by this UA."""
        return frozenset(
            {
                SIPMethod.INVITE,
                SIPMethod.ACK,
                SIPMethod.BYE,
                SIPMethod.CANCEL,
                SIPMethod.OPTIONS,
            }
        )

    @property
    def allow_header(self) -> str:
        """Comma-separated Allow header value in SIPMethod enum order."""
        return ",".join(m for m in SIPMethod if m in self.allowed_methods)

    def method_not_allowed(self, request: Request) -> None:
        """Respond with 405 Method Not Allowed.

        Override to customise the error response or add logging.

        Args:
            request: The unhandled SIP request.
        """
        logger.warning("SIP method %r is not supported", request.method)
        dialog_headers = {
            key: value
            for key, value in request.headers.items()
            if key in ("Via", "To", "From", "Call-ID", "CSeq")
        }
        self.send(
            Response(
                status_code=SIPStatus.METHOD_NOT_ALLOWED,
                phrase=SIPStatus.METHOD_NOT_ALLOWED.phrase,
                headers={**dialog_headers, "Allow": self.allow_header},
            ),
        )

    def request_received(self, request: Request) -> None:
        """Dispatch an incoming SIP request to the appropriate transaction."""
        match request.method:
            case SIPMethod.INVITE:
                asyncio.create_task(
                    InviteTransaction.receive(request=request, sip=self)
                )
            case SIPMethod.ACK:
                # For non-2xx ACKs the INVITE tx is still present; route by branch.
                try:
                    tx = self._transactions[request.branch]
                except KeyError:
                    self.send(
                        Response.from_request(
                            request,
                            status_code=SIPStatus.GONE,
                            phrase=SIPStatus.GONE.phrase,
                        )
                    )
                else:
                    tx.ack_received(request)
            case SIPMethod.BYE:
                asyncio.create_task(ByeTransaction.receive(request=request, sip=self))
            case SIPMethod.CANCEL:
                try:
                    tx = self._transactions[request.branch]
                except KeyError:
                    self.send(
                        Response.from_request(
                            request,
                            status_code=SIPStatus.GONE,
                            phrase=SIPStatus.GONE.phrase,
                        )
                    )
                    return
                tx.cancel_received(request)
            case SIPMethod.OPTIONS:
                self.send(
                    Response.from_request(
                        request,
                        status_code=SIPStatus.OK,
                        phrase=SIPStatus.OK.phrase,
                        headers={"Allow": self.allow_header},
                    )
                )
            case _:
                self.method_not_allowed(request)

    def response_received(self, response: Response) -> None:
        """Delegate REGISTER responses to the registration transaction.

        Args:
            response: The parsed SIP response.
        """
        try:
            tx = self._transactions[response.branch]
        except KeyError:
            logger.warning(
                "Received response with unknown branch %r: %r",
                response.branch,
                response,
            )
        else:
            tx.response_received(response)

    def on_registered(self) -> None:
        """Handle successful carrier registration.

        Override in subclasses to initiate outbound calls or start other
        post-registration activity. The base implementation is a no-op.
        """
        self.registered_event.set()
        if self.ready_callback is not None:
            self.ready_callback()

    @property
    def contact(self) -> str:
        """Return a ``Contact:`` header value for this UA.

        The URI scheme mirrors `aor`: a ``sips:`` AOR produces a
        ``sips:`` Contact (the strongest TLS guarantee); a ``sip:`` AOR over
        TLS produces ``sip:`` with ``transport=tls``; plain TCP produces plain
        ``sip:``.

        When *ob* is ``True`` the ``ob`` URI parameter ([RFC 5626 §5]) is
        appended inside the angle brackets to advertise outbound keep-alive
        support to the registrar.

        [RFC 5626 §5]: https://datatracker.ietf.org/doc/html/rfc5626#section-5
        """
        address = (
            f"{self.aor.user}@{self.public_address}"
            if self.aor.user
            else str(self.public_address)
        )
        ob_uri_param = ";ob"
        if self.aor.scheme == "sips":
            return f"<sips:{address}{ob_uri_param}>"
        tls_param = ";transport=tls" if self.is_secure else ";transport=tcp"
        return f"<sip:{address}{tls_param}{ob_uri_param}>"

    def connection_lost(self, exc: Exception | None) -> None:
        """Handle a lost TLS/TCP connection."""
        if exc is not None:
            logger.exception("Connection lost", exc_info=exc)
        if self.keepalive_task is not None:
            self.keepalive_task.cancel()
            self.keepalive_task = None
        self.transport = None
        self.disconnected_event.set()

Types

voip.sip.SipURI

Bases: str

A parsed SIP or SIPS URI per RFC 3261 §19.1.

Format: sip:user:password@host:port;uri-parameters?headers

Behaves as a plain str holding the canonical URI, so instances can be stored in header dicts unchanged. The parse classmethod decodes a raw SIP URI string into structured fields. IPv6 addresses in the host part must be enclosed in square brackets per RFC 2732 (e.g. sip:alice@[::1]:5060); the stored host is the bare address without brackets.

Examples:

>>> SipURI.parse("sip:alice@example.com")
'sip:alice@example.com:5060'
>>> SipURI.parse("sips:+15551234567@carrier.com:5061")
'sips:%2B15551234567@carrier.com:5061'
>>> SipURI.parse("sip:alice@[::1]:5060")
'sip:alice@[::1]:5060'
Source code in voip/sip/types.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class SipURI(str):
    """A parsed SIP or SIPS URI per [RFC 3261 §19.1].

    Format: `sip:user:password@host:port;uri-parameters?headers`

    Behaves as a plain `str` holding the canonical URI, so instances can be
    stored in header dicts unchanged.  The `parse` classmethod decodes a raw
    SIP URI string into structured fields.  IPv6 addresses in the host part
    must be enclosed in square brackets per [RFC 2732]
    (e.g. ``sip:alice@[::1]:5060``); the stored `host` is the bare address
    without brackets.

    [RFC 3261 §19.1]: https://datatracker.ietf.org/doc/html/rfc3261#section-19.1
    [RFC 2732]: https://datatracker.ietf.org/doc/html/rfc2732

    Examples:
        >>> SipURI.parse("sip:alice@example.com")
        'sip:alice@example.com:5060'
        >>> SipURI.parse("sips:+15551234567@carrier.com:5061")
        'sips:%2B15551234567@carrier.com:5061'
        >>> SipURI.parse("sip:alice@[::1]:5060")
        'sip:alice@[::1]:5060'

    """

    __slots__ = ("scheme", "host", "user", "password", "port", "parameters", "headers")

    SIP_URL_PATTERN: typing.ClassVar[re.Pattern[str]] = re.compile(
        r"^(?P<scheme>sips?):"
        r"((?P<user>[^@;:]+)(?P<password>:[^@;]*)?@)?"
        r"(?P<host>(\[[0-9a-fA-F:]+]|[^;?:@\[\]]+))"
        r"(?P<port>:[0-9]+)?"
        r"(?P<parameters>;[^?]+)?"
        r"(?P<headers>\?[^?]+)?$",
        re.IGNORECASE,
    )

    def __new__(
        cls,
        scheme: str,
        host: str | ipaddress.IPv6Address | ipaddress.IPv4Address,
        user: str | None = None,
        password: str | None = None,
        port: int | None = None,
        parameters: dict[str, str] = None,
        headers: dict[str, str] | None = None,
    ) -> SipURI:
        try:
            host = ipaddress.ip_address(host)
        except ValueError:
            pass
        port = port if port is not None else (5061 if scheme == "sips" else 5060)
        parameters = parameters or {}
        headers = headers or {}
        parts = [f"{scheme}:"]
        if user:
            parts.append(urllib.parse.quote(user))
            if password:
                parts.append(f":{urllib.parse.quote(password)}")
            parts.append("@")
        parts.append(
            f"[{host}]" if isinstance(host, ipaddress.IPv6Address) else str(host)
        )
        parts.append(f":{port}")
        for name, val in parameters.items():
            parts.append(
                f";{urllib.parse.quote(name)}={urllib.parse.quote(val)}"
                if val is not None
                else f";{urllib.parse.quote(name)}"
            )
        if headers:
            parts.append("?")
            parts.append(
                "&".join(
                    f"{urllib.parse.quote(name)}={urllib.parse.quote(val)}"
                    for name, val in headers.items()
                )
            )
        instance = super().__new__(cls, "".join(parts))
        instance.scheme = scheme
        instance.host = host
        instance.user = user
        instance.password = password
        instance.port = port
        instance.parameters = parameters
        instance.headers = headers
        return instance

    @classmethod
    def parse(cls, value: str) -> SipURI:
        """
        Parse a SIP or SIPS URI string into a `SipUri` instance.

        Returns:
            Parsed `SipUri` instance.

        Raises:
            ValueError: When the URI is malformed (missing scheme, invalid
                characters, unclosed IPv6 bracket, empty host, or invalid port).
        """
        if match := cls.SIP_URL_PATTERN.fullmatch(value):
            host = match.group("host")
            if host.startswith("[") and host.endswith("]"):
                host = host[1:-1]
            host = urllib.parse.unquote(host)

            return cls(
                scheme=match.group("scheme").lower(),
                user=urllib.parse.unquote(match.group("user"))
                if match.group("user")
                else None,
                host=host,
                password=urllib.parse.unquote(match.group("password")[1:])
                if match.group("password")
                else None,
                port=int(match.group("port")[1:]) if match.group("port") else None,
                parameters=dict(cls._parse_parameters(match.group("parameters")))
                if match.group("parameters")
                else {},
                headers=dict(cls._parse_headers(match.group("headers")[1:]))
                if match.group("headers")
                else {},
            )
        raise ValueError(f"Invalid SIP URI: {value!r}")

    @classmethod
    def _parse_parameters(cls, params: str) -> Iterator[tuple[str, str | None]]:
        for part in params[1:].split(";"):
            if "=" in part:
                name, val = part.split("=", 1)
                yield urllib.parse.unquote(name), urllib.parse.unquote(val)
            elif part:
                yield urllib.parse.unquote(part), None

    @classmethod
    def _parse_headers(cls, headers: str) -> Iterator[tuple[str, str]]:
        for part in headers.split("&"):
            if "=" in part:
                name, val = part.split("=", 1)
                yield urllib.parse.unquote(name), urllib.parse.unquote(val)
            elif part:
                yield urllib.parse.unquote(part), ""

    @property
    def maddr(self) -> NetworkAddress:
        try:
            return NetworkAddress.parse(self.parameters["maddr"])
        except KeyError:
            return NetworkAddress(self.host, self.port)

    @property
    def ttl(self) -> int | None:
        try:
            return int(self.parameters["ttl"])
        except KeyError:
            return None

    @property
    def transport(self):
        return (
            self.parameters.get("transport", "TLS").upper()
            if self.scheme == "sip"
            else "TLS"
        )

parse(value) classmethod

Parse a SIP or SIPS URI string into a SipUri instance.

Returns:

Type Description
SipURI

Parsed SipUri instance.

Raises:

Type Description
ValueError

When the URI is malformed (missing scheme, invalid characters, unclosed IPv6 bracket, empty host, or invalid port).

Source code in voip/sip/types.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@classmethod
def parse(cls, value: str) -> SipURI:
    """
    Parse a SIP or SIPS URI string into a `SipUri` instance.

    Returns:
        Parsed `SipUri` instance.

    Raises:
        ValueError: When the URI is malformed (missing scheme, invalid
            characters, unclosed IPv6 bracket, empty host, or invalid port).
    """
    if match := cls.SIP_URL_PATTERN.fullmatch(value):
        host = match.group("host")
        if host.startswith("[") and host.endswith("]"):
            host = host[1:-1]
        host = urllib.parse.unquote(host)

        return cls(
            scheme=match.group("scheme").lower(),
            user=urllib.parse.unquote(match.group("user"))
            if match.group("user")
            else None,
            host=host,
            password=urllib.parse.unquote(match.group("password")[1:])
            if match.group("password")
            else None,
            port=int(match.group("port")[1:]) if match.group("port") else None,
            parameters=dict(cls._parse_parameters(match.group("parameters")))
            if match.group("parameters")
            else {},
            headers=dict(cls._parse_headers(match.group("headers")[1:]))
            if match.group("headers")
            else {},
        )
    raise ValueError(f"Invalid SIP URI: {value!r}")

voip.sip.CallerID

Bases: str

SIP From/To header value with structured access and privacy-safe repr.

Behaves as a plain str so it is wire-format compatible and can be stored in header dicts unchanged. repr() returns a short anonymized form that shows only the last four characters of the user part and the carrier domain — useful for log messages.

Examples:

>>> str(CallerID('"08001234567" <sip:08001234567@telefonica.de>;tag=abc'))
'"08001234567" <sip:08001234567@telefonica.de>;tag=abc'
>>> repr(CallerID('"08001234567" <sip:08001234567@telefonica.de>;tag=abc'))
'***4567@telefonica.de'
>>> repr(CallerID('sip:alice@example.com'))
'*lice@example.com'
Source code in voip/sip/types.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class CallerID(str):
    """SIP From/To header value with structured access and privacy-safe repr.

    Behaves as a plain ``str`` so it is wire-format compatible and can be
    stored in header dicts unchanged.  ``repr()`` returns a short anonymized
    form that shows only the last four characters of the user part and the
    carrier domain — useful for log messages.

    Examples:
        >>> str(CallerID('"08001234567" <sip:08001234567@telefonica.de>;tag=abc'))
        '"08001234567" <sip:08001234567@telefonica.de>;tag=abc'
        >>> repr(CallerID('"08001234567" <sip:08001234567@telefonica.de>;tag=abc'))
        '***4567@telefonica.de'
        >>> repr(CallerID('sip:alice@example.com'))
        '*lice@example.com'
    """

    @property
    def display_name(self) -> str | None:
        """Display name from the From/To header, if present."""
        if m := re.match(r'^"([^"]+)"\s*<|^([^<"]+?)\s*<', self):
            return (m.group(1) or m.group(2) or "").strip() or None
        return None

    @property
    def uri(self) -> SipURI | None:
        """Parsed SIP or tel URI embedded in the header value, if present."""
        if m := re.search(r"<?((?:sips?|tel):[^>\s]+)>?", self):
            return SipURI.parse(m.group(1))

    @property
    def user(self) -> str | None:
        """SIP user part (phone number or username)."""
        if m := re.search(r"sips?:([^@>;\s]+)@", self):
            return m.group(1)

    @property
    def host(self) -> str | None:
        """Carrier domain extracted from the SIP URI."""
        if m := re.search(r"sips?:[^@>;\s]+@([^>;)\s,]+)", self):
            return m.group(1)

    @property
    def tag(self) -> str | None:
        """Dialog tag parameter value, if present."""
        m = re.search(r";tag=([^\s;]+)", self)
        return m.group(1) if m else None

    def __repr__(self) -> str:
        user = self.display_name or self.user or ""
        host = self.host or ""
        masked = ("*" * max(0, len(user) - 4)) + user[-4:] if user else "****"
        return f"{masked}@{host}" if host else masked

display_name property

Display name from the From/To header, if present.

host property

Carrier domain extracted from the SIP URI.

tag property

Dialog tag parameter value, if present.

uri property

Parsed SIP or tel URI embedded in the header value, if present.

user property

SIP user part (phone number or username).

voip.sip.SIPStatus

Bases: IntEnum

SIP Status Codes based on RFC 3261.

Source code in voip/sip/types.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
class SIPStatus(enum.IntEnum):
    """SIP Status Codes based on [RFC 3261].

    [RFC 3261]: https://datatracker.ietf.org/doc/html/rfc3261#section-21
    """

    def __new__(cls, value: int, phrase: str) -> SIPStatus:
        obj = int.__new__(cls, value)
        obj._value_ = value
        obj.phrase = phrase
        return obj

    TRYING = 100, "Trying"
    """The request is being processed. No final response is available yet."""

    RINGING = 180, "Ringing"
    """The called party is being alerted of the call."""

    CALL_IS_BEING_FORWARDED = 181, "Call Is Being Forwarded"
    """The called party is being alerted of the call, but the call is not yet established."""

    QUEUED = 182, "Queued"
    """The called party is being alerted of the call, but the call is not yet established."""

    SESSION_PROGRESS = 183, "Session Progress"
    """The called party is being alerted of the call, but the call is not yet established."""

    OK = 200, "OK"
    """The request has succeeded."""

    MULTIPLE_CHOICES = 300, "Multiple Choices"
    """The requested resource has multiple representations, each with its own specific location."""

    MOVED_PERMANENTLY = 301, "Moved Permanently"
    """The requested resource has been assigned a new permanent URI and any future references to this resource ought to use one of the returned URIs."""

    MOVED_TEMPORARILY = 302, "Moved Temporarily"
    """The requested resource is temporarily unavailable and the server is asking the client to try again later."""

    USE_PROXY = 305, "Use Proxy"
    """The requested resource is available only through a proxy, the address for which is provided in the response."""

    ALTERNATIVE_SERVICE = 380, "Alternative Service"
    """The server has fulfilled a request for the service indicated by the URI."""

    BAD_REQUEST = 400, "Bad Request"
    """The request has bad syntax or cannot be fulfilled due to bad syntax."""

    UNAUTHORIZED = 401, "Unauthorized"
    """The request requires user authentication."""

    PAYMENT_REQUIRED = 402, "Payment Required"
    """Further action is required."""

    FORBIDDEN = 403, "Forbidden"
    """The server understood the request but refuses to fulfill it."""

    NOT_FOUND = 404, "Not Found"
    """The requested resource could not be found."""

    METHOD_NOT_ALLOWED = 405, "Method Not Allowed"
    """The method specified in the Request-URI is not allowed for the resource identified by the request URI."""

    NOT_ACCEPTABLE = 406, "Not Acceptable"
    """The server cannot produce a response matching the Accept headers."""

    PROXY_AUTHENTICATION_REQUIRED = 407, "Proxy Authentication Required"
    """The client must authenticate itself with the proxy."""

    REQUEST_TIMEOUT = 408, "Request Timeout"
    """The server timed out waiting for the request."""

    GONE = 410, "Gone"
    """The requested resource is no longer available at the server and no longer exists."""

    REQUEST_ENTITY_TOO_LARGE = 413, "Request Entity Too Large"
    """The server will not accept the request, because the entity of the request is too large."""

    REQUEST_URI_TOO_LONG = 414, "Request-URI Too Long"
    """The server will not accept the request, because the Request-URI is too long."""

    UNSUPPORTED_MEDIA_TYPE = 415, "Unsupported Media Type"
    """The server will not accept the request, because the media type of the request is unsupported."""

    UNSUPPORTED_URI_SCHEME = 416, "Unsupported URI Scheme"
    """The server will not accept the request, because the URI scheme of the request is unsupported."""

    BAD_EXTENSION = 420, "Bad Extension"
    """This status code indicates that the server does not recognize the value of any of the parameters that it needs to understand in the request."""

    EXTENSION_REQUIRED = 421, "Extension Required"
    """This status code indicates that the server requires the client to identify itself (usually, using the Contact header field) before it will proceed with the request."""

    INTERVAL_TOO_BRIEF = 423, "Interval Too Brief"
    """This status code indicates that the server is unwilling to process the request because either an individual header field, or all the header fields collectively, are too large."""

    TEMPORARILY_UNAVAILABLE = 480, "Temporarily Unavailable"
    """This status code indicates that the server is currently unable to handle the request due to a temporary overloading or maintenance of the server."""

    CALL_TRANSACTION_DOES_NOT_EXIST = 481, "Call/Transaction Does Not Exist"
    """This status code indicates that the server has received a final response for the transaction which it is still attempting to complete."""

    LOOP_DETECTED = 482, "Loop Detected"
    """This status code indicates that the server has detected an infinite loop while processing the request."""

    TOO_MANY_HOPS = 483, "Too Many Hops"
    """This status code indicates that the server has exceeded the maximum number of hops allowed in the request URI."""

    ADDRESS_INCOMPLETE = 484, "Address Incomplete"
    """This status code indicates that the server has received a final response for the transaction which it is still attempting to complete, but has an invalid value for one or more of the header fields included in the request message."""

    AMBIGUOUS = 485, "Ambiguous"
    """This status code indicates that the server cannot decide on a response to the request because multiple responses are possible."""

    BUSY_HERE = 486, "Busy Here"
    """This status code indicates that the server is busy here."""

    REQUEST_TERMINATED = 487, "Request Terminated"
    """This status code indicates that the server has received a final response for the transaction which it is still attempting to complete, but has received a termination request for that transaction from the client."""

    NOT_ACCEPTABLE_HERE = 488, "Not Acceptable Here"
    """This status code indicates that the server is not able to produce a response which is acceptable to the client, according to the proactive negotiation header fields received in the request, and the server is unwilling to supply a default reason phrase."""

    REQUEST_PENDING = 491, "Request Pending"
    """This status code indicates that the server has received a final response for the transaction which it is still attempting to complete, but has not yet delivered that response to the client."""

    UNDECIPHERABLE = 493, "Undecipherable"
    """This status code indicates that the server was unable to decrypt a message after performing the necessary decryption(s)."""

    SERVER_INTERNAL_ERROR = 500, "Server Internal Error"
    """The server encountered an unexpected condition which prevented it from fulfilling the request."""

    NOT_IMPLEMENTED = 501, "Not Implemented"
    """The server does not support the functionality required to fulfill the request."""

    BAD_GATEWAY = 502, "Bad Gateway"
    """The server, while acting as a gateway or proxy, received an invalid response from the upstream server it accessed in attempting to fulfill the request."""

    SERVICE_UNAVAILABLE = 503, "Service Unavailable"
    """The server is currently unable to handle the request due to a temporary overloading or maintenance of the server."""

    SERVER_TIME_OUT = 504, "Server Time-out"
    """The server, while acting as a gateway or proxy, did not receive a timely response from the upstream server specified by the URI (e.g., HTTP, FTP, LDAP) or some other auxiliary server (e.g., DNS) it needed to access in attempting to complete the request."""

    VERSION_NOT_SUPPORTED = 505, "Version Not Supported"
    """The server does not support, or refuses to support, the protocol version that was used in the request message."""

    MESSAGE_TOO_LARGE = 513, "Message Too Large"
    """The server is unwilling to process the request because its header fields are too large."""

    BUSY_EVERYWHERE = 600, "Busy Everywhere"
    """The server is not able to process the request because it is busy. For example, this error might be given if a server is overloaded with requests and is unable to process one of the requests."""

    DECLINE = 603, "Decline"
    """The call has been declined."""

    DOES_NOT_EXIST_ANYWHERE = 604, "Does Not Exist Anywhere"
    """The server has received a final response for the transaction which it is still attempting to complete, but has received a termination request for that transaction from a server which it does not control."""

    NOT_ACCEPTABLE_ANYWHERE = 606, "Not Acceptable"
    """The server is not able to produce a response which is acceptable to the client, according to the proactive negotiation header fields received in the request, and the server is unwilling to supply a default reason phrase."""

ADDRESS_INCOMPLETE = (484, 'Address Incomplete') class-attribute instance-attribute

This status code indicates that the server has received a final response for the transaction which it is still attempting to complete, but has an invalid value for one or more of the header fields included in the request message.

ALTERNATIVE_SERVICE = (380, 'Alternative Service') class-attribute instance-attribute

The server has fulfilled a request for the service indicated by the URI.

AMBIGUOUS = (485, 'Ambiguous') class-attribute instance-attribute

This status code indicates that the server cannot decide on a response to the request because multiple responses are possible.

BAD_EXTENSION = (420, 'Bad Extension') class-attribute instance-attribute

This status code indicates that the server does not recognize the value of any of the parameters that it needs to understand in the request.

BAD_GATEWAY = (502, 'Bad Gateway') class-attribute instance-attribute

The server, while acting as a gateway or proxy, received an invalid response from the upstream server it accessed in attempting to fulfill the request.

BAD_REQUEST = (400, 'Bad Request') class-attribute instance-attribute

The request has bad syntax or cannot be fulfilled due to bad syntax.

BUSY_EVERYWHERE = (600, 'Busy Everywhere') class-attribute instance-attribute

The server is not able to process the request because it is busy. For example, this error might be given if a server is overloaded with requests and is unable to process one of the requests.

BUSY_HERE = (486, 'Busy Here') class-attribute instance-attribute

This status code indicates that the server is busy here.

CALL_IS_BEING_FORWARDED = (181, 'Call Is Being Forwarded') class-attribute instance-attribute

The called party is being alerted of the call, but the call is not yet established.

CALL_TRANSACTION_DOES_NOT_EXIST = (481, 'Call/Transaction Does Not Exist') class-attribute instance-attribute

This status code indicates that the server has received a final response for the transaction which it is still attempting to complete.

DECLINE = (603, 'Decline') class-attribute instance-attribute

The call has been declined.

DOES_NOT_EXIST_ANYWHERE = (604, 'Does Not Exist Anywhere') class-attribute instance-attribute

The server has received a final response for the transaction which it is still attempting to complete, but has received a termination request for that transaction from a server which it does not control.

EXTENSION_REQUIRED = (421, 'Extension Required') class-attribute instance-attribute

This status code indicates that the server requires the client to identify itself (usually, using the Contact header field) before it will proceed with the request.

FORBIDDEN = (403, 'Forbidden') class-attribute instance-attribute

The server understood the request but refuses to fulfill it.

GONE = (410, 'Gone') class-attribute instance-attribute

The requested resource is no longer available at the server and no longer exists.

INTERVAL_TOO_BRIEF = (423, 'Interval Too Brief') class-attribute instance-attribute

This status code indicates that the server is unwilling to process the request because either an individual header field, or all the header fields collectively, are too large.

LOOP_DETECTED = (482, 'Loop Detected') class-attribute instance-attribute

This status code indicates that the server has detected an infinite loop while processing the request.

MESSAGE_TOO_LARGE = (513, 'Message Too Large') class-attribute instance-attribute

The server is unwilling to process the request because its header fields are too large.

METHOD_NOT_ALLOWED = (405, 'Method Not Allowed') class-attribute instance-attribute

The method specified in the Request-URI is not allowed for the resource identified by the request URI.

MOVED_PERMANENTLY = (301, 'Moved Permanently') class-attribute instance-attribute

The requested resource has been assigned a new permanent URI and any future references to this resource ought to use one of the returned URIs.

MOVED_TEMPORARILY = (302, 'Moved Temporarily') class-attribute instance-attribute

The requested resource is temporarily unavailable and the server is asking the client to try again later.

MULTIPLE_CHOICES = (300, 'Multiple Choices') class-attribute instance-attribute

The requested resource has multiple representations, each with its own specific location.

NOT_ACCEPTABLE = (406, 'Not Acceptable') class-attribute instance-attribute

The server cannot produce a response matching the Accept headers.

NOT_ACCEPTABLE_ANYWHERE = (606, 'Not Acceptable') class-attribute instance-attribute

The server is not able to produce a response which is acceptable to the client, according to the proactive negotiation header fields received in the request, and the server is unwilling to supply a default reason phrase.

NOT_ACCEPTABLE_HERE = (488, 'Not Acceptable Here') class-attribute instance-attribute

This status code indicates that the server is not able to produce a response which is acceptable to the client, according to the proactive negotiation header fields received in the request, and the server is unwilling to supply a default reason phrase.

NOT_FOUND = (404, 'Not Found') class-attribute instance-attribute

The requested resource could not be found.

NOT_IMPLEMENTED = (501, 'Not Implemented') class-attribute instance-attribute

The server does not support the functionality required to fulfill the request.

OK = (200, 'OK') class-attribute instance-attribute

The request has succeeded.

PAYMENT_REQUIRED = (402, 'Payment Required') class-attribute instance-attribute

Further action is required.

PROXY_AUTHENTICATION_REQUIRED = (407, 'Proxy Authentication Required') class-attribute instance-attribute

The client must authenticate itself with the proxy.

QUEUED = (182, 'Queued') class-attribute instance-attribute

The called party is being alerted of the call, but the call is not yet established.

REQUEST_ENTITY_TOO_LARGE = (413, 'Request Entity Too Large') class-attribute instance-attribute

The server will not accept the request, because the entity of the request is too large.

REQUEST_PENDING = (491, 'Request Pending') class-attribute instance-attribute

This status code indicates that the server has received a final response for the transaction which it is still attempting to complete, but has not yet delivered that response to the client.

REQUEST_TERMINATED = (487, 'Request Terminated') class-attribute instance-attribute

This status code indicates that the server has received a final response for the transaction which it is still attempting to complete, but has received a termination request for that transaction from the client.

REQUEST_TIMEOUT = (408, 'Request Timeout') class-attribute instance-attribute

The server timed out waiting for the request.

REQUEST_URI_TOO_LONG = (414, 'Request-URI Too Long') class-attribute instance-attribute

The server will not accept the request, because the Request-URI is too long.

RINGING = (180, 'Ringing') class-attribute instance-attribute

The called party is being alerted of the call.

SERVER_INTERNAL_ERROR = (500, 'Server Internal Error') class-attribute instance-attribute

The server encountered an unexpected condition which prevented it from fulfilling the request.

SERVER_TIME_OUT = (504, 'Server Time-out') class-attribute instance-attribute

The server, while acting as a gateway or proxy, did not receive a timely response from the upstream server specified by the URI (e.g., HTTP, FTP, LDAP) or some other auxiliary server (e.g., DNS) it needed to access in attempting to complete the request.

SERVICE_UNAVAILABLE = (503, 'Service Unavailable') class-attribute instance-attribute

The server is currently unable to handle the request due to a temporary overloading or maintenance of the server.

SESSION_PROGRESS = (183, 'Session Progress') class-attribute instance-attribute

The called party is being alerted of the call, but the call is not yet established.

TEMPORARILY_UNAVAILABLE = (480, 'Temporarily Unavailable') class-attribute instance-attribute

This status code indicates that the server is currently unable to handle the request due to a temporary overloading or maintenance of the server.

TOO_MANY_HOPS = (483, 'Too Many Hops') class-attribute instance-attribute

This status code indicates that the server has exceeded the maximum number of hops allowed in the request URI.

TRYING = (100, 'Trying') class-attribute instance-attribute

The request is being processed. No final response is available yet.

UNAUTHORIZED = (401, 'Unauthorized') class-attribute instance-attribute

The request requires user authentication.

UNDECIPHERABLE = (493, 'Undecipherable') class-attribute instance-attribute

This status code indicates that the server was unable to decrypt a message after performing the necessary decryption(s).

UNSUPPORTED_MEDIA_TYPE = (415, 'Unsupported Media Type') class-attribute instance-attribute

The server will not accept the request, because the media type of the request is unsupported.

UNSUPPORTED_URI_SCHEME = (416, 'Unsupported URI Scheme') class-attribute instance-attribute

The server will not accept the request, because the URI scheme of the request is unsupported.

USE_PROXY = (305, 'Use Proxy') class-attribute instance-attribute

The requested resource is available only through a proxy, the address for which is provided in the response.

VERSION_NOT_SUPPORTED = (505, 'Version Not Supported') class-attribute instance-attribute

The server does not support, or refuses to support, the protocol version that was used in the request message.

voip.sdp.types

SDP field types as defined by RFC 4566.

Attribute dataclass

Bases: ByteSerializableObject

Attribute field (a=) as defined by RFC 4566 §5.13.

Source code in voip/sdp/types.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@dataclasses.dataclass(slots=True, frozen=True)
class Attribute(ByteSerializableObject):
    """Attribute field (a=) as defined by RFC 4566 §5.13."""

    letter: ClassVar[str] = "a"
    session_attr: ClassVar[str] = "attributes"
    is_list: ClassVar[bool] = True
    media_attr: ClassVar[str | None] = "attributes"

    name: str
    value: str | None = None

    def __bytes__(self) -> bytes:
        match self.value:
            case None:
                return self.name.encode()
            case _:
                return f"{self.name}:{self.value}".encode()

    @classmethod
    def parse(cls, data: bytes | str) -> Attribute:
        value = data.decode() if isinstance(data, bytes) else data
        name, _, attr_value = value.partition(":")
        return cls(name=name, value=attr_value or None)

Bandwidth dataclass

Bases: ByteSerializableObject

Bandwidth field (b=) as defined by RFC 4566 §5.8.

Source code in voip/sdp/types.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@dataclasses.dataclass(slots=True, frozen=True)
class Bandwidth(ByteSerializableObject):
    """Bandwidth field (b=) as defined by RFC 4566 §5.8."""

    letter: ClassVar[str] = "b"
    session_attr: ClassVar[str] = "bandwidths"
    is_list: ClassVar[bool] = True
    media_attr: ClassVar[str | None] = "bandwidths"

    bwtype: str
    bandwidth: int

    def __bytes__(self) -> bytes:
        return f"{self.bwtype}:{self.bandwidth}".encode()

    @classmethod
    def parse(cls, data: bytes | str) -> Bandwidth:
        value = data.decode() if isinstance(data, bytes) else data
        bwtype, _, bandwidth = value.partition(":")
        return cls(bwtype=bwtype, bandwidth=int(bandwidth))

ConnectionData dataclass

Bases: ByteSerializableObject

Connection data field (c=) as defined by RFC 4566 §5.7.

Source code in voip/sdp/types.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@dataclasses.dataclass(slots=True, frozen=True)
class ConnectionData(ByteSerializableObject):
    """Connection data field (c=) as defined by RFC 4566 §5.7."""

    letter: ClassVar[str] = "c"
    session_attr: ClassVar[str] = "connection"
    is_list: ClassVar[bool] = False
    media_attr: ClassVar[str | None] = "connection"

    nettype: str
    addrtype: str
    connection_address: str

    def __bytes__(self) -> bytes:
        return f"{self.nettype} {self.addrtype} {self.connection_address}".encode()

    @classmethod
    def parse(cls, data: bytes | str) -> ConnectionData:
        value = data.decode() if isinstance(data, bytes) else data
        nettype, addrtype, connection_address = value.split(" ", 2)
        return cls(
            nettype=nettype,
            addrtype=addrtype,
            connection_address=connection_address,
        )

Field

Bases: Protocol

SDP field descriptor protocol.

Source code in voip/sdp/types.py
25
26
27
28
29
30
31
32
33
34
35
36
@runtime_checkable
class Field(Protocol):
    """SDP field descriptor protocol."""

    letter: str
    session_attr: str
    is_list: bool
    media_attr: str | None

    @staticmethod
    def parse(value: str) -> object:
        """Parse a raw SDP line value."""
parse(value) staticmethod

Parse a raw SDP line value.

Source code in voip/sdp/types.py
34
35
36
@staticmethod
def parse(value: str) -> object:
    """Parse a raw SDP line value."""

IntField dataclass

Descriptor for SDP fields that parse and serialize as integers.

Source code in voip/sdp/types.py
53
54
55
56
57
58
59
60
61
62
63
64
@dataclasses.dataclass(slots=True, frozen=True)
class IntField:
    """Descriptor for SDP fields that parse and serialize as integers."""

    letter: str
    session_attr: str
    is_list: bool = False
    media_attr: str | None = None

    @staticmethod
    def parse(value: str) -> int:
        return int(value)

MediaDescription dataclass

Bases: ByteSerializableObject

Media description section (m=) as defined by RFC 4566 §5.14.

Source code in voip/sdp/types.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
@dataclasses.dataclass(slots=True)
class MediaDescription(ByteSerializableObject):
    """Media description section (m=) as defined by RFC 4566 §5.14."""

    letter: ClassVar[str] = "m"
    session_attr: ClassVar[str] = "media"
    is_list: ClassVar[bool] = True
    media_attr: ClassVar[str | None] = None

    media: str
    port: int
    proto: str
    fmt: list[RTPPayloadFormat]
    title: str | None = None
    connection: ConnectionData | None = None
    bandwidths: list[Bandwidth] = dataclasses.field(default_factory=list)
    attributes: list[Attribute] = dataclasses.field(default_factory=list)

    def get_format(self, pt: int | str) -> RTPPayloadFormat | None:
        """Return the `RTPPayloadFormat` for payload type *pt*, or ``None``."""
        target = int(pt)
        return next((f for f in self.fmt if f.payload_type == target), None)

    def apply_attribute(self, attr: Attribute) -> bool:
        """Apply a media-level ``a=`` attribute, returning ``True`` if consumed.

        Handles ``a=rtpmap`` and ``a=fmtp`` by updating the matching
        `RTPPayloadFormat` entry.  Other attributes go to `attributes`.
        """
        if attr.name == "rtpmap" and attr.value is not None:
            rtpfmt = RTPPayloadFormat.parse(attr.value)
            for i, f in enumerate(self.fmt):
                if f.payload_type == rtpfmt.payload_type:
                    # Preserve fmtp if it was already applied out-of-order.
                    if f.fmtp is not None and rtpfmt.fmtp is None:
                        rtpfmt.fmtp = f.fmtp
                    self.fmt[i] = rtpfmt
                    break
            return True
        if attr.name == "fmtp" and attr.value is not None:
            pt_str, _, params = attr.value.partition(" ")
            try:
                pt = int(pt_str)
            except ValueError:
                return False
            for f in self.fmt:
                if f.payload_type == pt:
                    f.fmtp = params
                    return True
        return False

    def _lines(self) -> Generator[str]:
        """Yield each SDP line in canonical field order."""
        yield f"m={self.media} {self.port} {self.proto} {' '.join(str(f.payload_type) for f in self.fmt)}"
        match self.title:
            case str() as title:
                yield f"i={title}"
        match self.connection:
            case ConnectionData() as connection:
                yield f"c={connection}"
        yield from (f"b={b}" for b in self.bandwidths)
        for fmt in self.fmt:
            match fmt.encoding_name, fmt.sample_rate:
                case (str(), int()):
                    yield f"a=rtpmap:{fmt}"
            match fmt.fmtp:
                case str() as fmtp:
                    yield f"a=fmtp:{fmt.payload_type} {fmtp}"
        yield from (f"a={a}" for a in self.attributes)

    def __bytes__(self) -> bytes:
        return "\r\n".join(self._lines()).encode()

    @classmethod
    def parse(cls, data: bytes | str) -> MediaDescription:
        value = data.decode() if isinstance(data, bytes) else data
        lines = value.splitlines()
        first = lines[0].rstrip("\r").removeprefix("m=")
        media_type, port_str, proto, *fmts = first.split()
        fmt = [RTPPayloadFormat.from_pt(int(pt)) for pt in fmts]
        obj = cls(media=media_type, port=int(port_str), proto=proto, fmt=fmt)
        for line in lines[1:]:
            line = line.rstrip("\r")
            if not line or "=" not in line:
                continue
            letter, _, attr_value = line.partition("=")
            match letter:
                case "i":
                    obj.title = attr_value
                case "c":
                    obj.connection = ConnectionData.parse(attr_value)
                case "b":
                    obj.bandwidths.append(Bandwidth.parse(attr_value))
                case "a":
                    attr = Attribute.parse(attr_value)
                    if not obj.apply_attribute(attr):
                        obj.attributes.append(attr)
        return obj
apply_attribute(attr)

Apply a media-level a= attribute, returning True if consumed.

Handles a=rtpmap and a=fmtp by updating the matching RTPPayloadFormat entry. Other attributes go to attributes.

Source code in voip/sdp/types.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def apply_attribute(self, attr: Attribute) -> bool:
    """Apply a media-level ``a=`` attribute, returning ``True`` if consumed.

    Handles ``a=rtpmap`` and ``a=fmtp`` by updating the matching
    `RTPPayloadFormat` entry.  Other attributes go to `attributes`.
    """
    if attr.name == "rtpmap" and attr.value is not None:
        rtpfmt = RTPPayloadFormat.parse(attr.value)
        for i, f in enumerate(self.fmt):
            if f.payload_type == rtpfmt.payload_type:
                # Preserve fmtp if it was already applied out-of-order.
                if f.fmtp is not None and rtpfmt.fmtp is None:
                    rtpfmt.fmtp = f.fmtp
                self.fmt[i] = rtpfmt
                break
        return True
    if attr.name == "fmtp" and attr.value is not None:
        pt_str, _, params = attr.value.partition(" ")
        try:
            pt = int(pt_str)
        except ValueError:
            return False
        for f in self.fmt:
            if f.payload_type == pt:
                f.fmtp = params
                return True
    return False
get_format(pt)

Return the RTPPayloadFormat for payload type pt, or None.

Source code in voip/sdp/types.py
353
354
355
356
def get_format(self, pt: int | str) -> RTPPayloadFormat | None:
    """Return the `RTPPayloadFormat` for payload type *pt*, or ``None``."""
    target = int(pt)
    return next((f for f in self.fmt if f.payload_type == target), None)

Origin dataclass

Bases: ByteSerializableObject

Origin field (o=) as defined by RFC 4566 §5.2.

Source code in voip/sdp/types.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclasses.dataclass(slots=True, frozen=True)
class Origin(ByteSerializableObject):
    """Origin field (o=) as defined by RFC 4566 §5.2."""

    letter: ClassVar[str] = "o"
    session_attr: ClassVar[str] = "origin"
    is_list: ClassVar[bool] = False
    media_attr: ClassVar[str | None] = None

    username: str
    sess_id: str
    sess_version: str
    nettype: str
    addrtype: str
    unicast_address: str

    def __bytes__(self) -> bytes:
        return (
            f"{self.username} {self.sess_id} {self.sess_version}"
            f" {self.nettype} {self.addrtype} {self.unicast_address}"
        ).encode()

    @classmethod
    def parse(cls, data: bytes | str) -> Origin:
        value = data.decode() if isinstance(data, bytes) else data
        username, sess_id, sess_version, nettype, addrtype, unicast_address = (
            value.split(" ", 5)
        )
        return cls(
            username=username,
            sess_id=sess_id,
            sess_version=sess_version,
            nettype=nettype,
            addrtype=addrtype,
            unicast_address=unicast_address,
        )

PayloadTypeSpec

Bases: NamedTuple

Typed specification for a static RTP payload type (RFC 3551 §6).

Source code in voip/sdp/types.py
202
203
204
205
206
207
208
209
210
class PayloadTypeSpec(NamedTuple):
    """Typed specification for a static RTP payload type (RFC 3551 §6)."""

    pt: int
    sample_rate: int
    encoding_name: str
    channels: int = 1
    #: Samples per standard 20 ms RTP frame; 0 = variable or not applicable.
    frame_size: int = 0

RTPPayloadFormat dataclass

Bases: ByteSerializableObject

RTP payload format descriptor (RFC 3551 §6 / RFC 4566 §6).

Codec parameters from a=rtpmap are merged in by the SDP parser. Static payload types fall back to the StaticPayloadType table. Dynamic payload types (PT ≥ 96) require an explicit a=rtpmap.

Serialises to the a=rtpmap value when codec fields are present.

Source code in voip/sdp/types.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@dataclasses.dataclass(slots=True)
class RTPPayloadFormat(ByteSerializableObject):
    """RTP payload format descriptor (RFC 3551 §6 / RFC 4566 §6).

    Codec parameters from ``a=rtpmap`` are merged in by the SDP parser.
    Static payload types fall back to the `StaticPayloadType` table.
    Dynamic payload types (PT ≥ 96) require an explicit ``a=rtpmap``.

    Serialises to the ``a=rtpmap`` value when codec fields are present.
    """

    payload_type: int
    fmtp: str | None = None
    encoding_name: str | None = None
    channels: int = 1
    sample_rate: int | None = None

    def __post_init__(self):
        try:
            default = StaticPayloadType.from_pt(self.payload_type)
        except ValueError:
            pass
        else:
            self.sample_rate = self.sample_rate or default.sample_rate
            self.encoding_name = self.encoding_name or default.encoding_name
            self.channels = self.channels or default.channels

    def __bytes__(self) -> bytes:
        base = f"{self.payload_type} {self.encoding_name}/{self.sample_rate}"
        match self.channels:
            case 1:
                return base.encode()
            case _:
                return f"{base}/{self.channels}".encode()

    @classmethod
    def parse(cls, data: bytes | str) -> RTPPayloadFormat:
        value = data.decode() if isinstance(data, bytes) else data
        fmt, _, rest = value.partition(" ")
        parts = rest.split("/")
        if len(parts) < 2:
            raise ValueError(f"Invalid rtpmap value: {value!r}")
        return cls(
            payload_type=int(fmt),
            encoding_name=parts[0],
            sample_rate=int(parts[1]),
            channels=int(parts[2]) if len(parts) > 2 else 1,
        )

    @classmethod
    def from_pt(cls, pt: int) -> RTPPayloadFormat:
        """Create an `RTPPayloadFormat` from a payload type number."""
        return cls(payload_type=pt)

    @property
    def frame_size(self) -> int:
        """Samples per standard 20 ms RTP frame.

        For static payload types the value comes from `StaticPayloadType`.
        For dynamic payload types (e.g. Opus, PT ≥ 96) it is derived from
        `sample_rate` assuming a 20 ms packetisation interval.
        """
        try:
            spec = StaticPayloadType.from_pt(self.payload_type)
            if spec.frame_size:
                return spec.frame_size
        except ValueError:
            pass
        return (self.sample_rate or 8000) * 20 // 1000
frame_size property

Samples per standard 20 ms RTP frame.

For static payload types the value comes from StaticPayloadType. For dynamic payload types (e.g. Opus, PT ≥ 96) it is derived from sample_rate assuming a 20 ms packetisation interval.

from_pt(pt) classmethod

Create an RTPPayloadFormat from a payload type number.

Source code in voip/sdp/types.py
313
314
315
316
@classmethod
def from_pt(cls, pt: int) -> RTPPayloadFormat:
    """Create an `RTPPayloadFormat` from a payload type number."""
    return cls(payload_type=pt)

StaticPayloadType

Bases: PayloadTypeSpec, Enum

Static RTP payload types as defined by RFC 3551 §6.

Each member's value is a PayloadTypeSpec carrying the payload type number, sample rate, canonical encoding name, channel count, and frame size. Use from_pt to look up a member by its PT number.

Source code in voip/sdp/types.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class StaticPayloadType(PayloadTypeSpec, enum.Enum):
    """Static RTP payload types as defined by RFC 3551 §6.

    Each member's `value` is a `PayloadTypeSpec` carrying the
    payload type number, sample rate, canonical encoding name, channel count,
    and frame size.  Use `from_pt` to look up a member by its PT number.
    """

    #: G.711 µ-law
    PCMU = PayloadTypeSpec(0, 8000, "PCMU", frame_size=160)
    GSM = PayloadTypeSpec(3, 8000, "GSM", frame_size=160)
    G723 = PayloadTypeSpec(4, 8000, "G723", frame_size=160)
    #: DVI4 at 8 kHz
    DVI4_8K = PayloadTypeSpec(5, 8000, "DVI4", frame_size=160)
    #: DVI4 at 16 kHz
    DVI4_16K = PayloadTypeSpec(6, 16000, "DVI4", frame_size=320)
    LPC = PayloadTypeSpec(7, 8000, "LPC", frame_size=160)
    #: G.711 A-law
    PCMA = PayloadTypeSpec(8, 8000, "PCMA", frame_size=160)
    #: RTP clock rate is 8000 per RFC 3551 even though wideband
    G722 = PayloadTypeSpec(9, 8000, "G722", frame_size=160)
    L16_STEREO = PayloadTypeSpec(10, 44100, "L16", 2, frame_size=882)
    L16_MONO = PayloadTypeSpec(11, 44100, "L16", frame_size=882)
    QCELP = PayloadTypeSpec(12, 8000, "QCELP", frame_size=160)
    CN = PayloadTypeSpec(13, 8000, "CN", frame_size=160)
    MPA = PayloadTypeSpec(14, 90000, "MPA")
    G728 = PayloadTypeSpec(15, 8000, "G728", frame_size=160)
    #: DVI4 at 11.025 kHz
    DVI4_11K = PayloadTypeSpec(16, 11025, "DVI4", frame_size=220)
    #: DVI4 at 22.05 kHz
    DVI4_22K = PayloadTypeSpec(17, 22050, "DVI4", frame_size=441)
    G729 = PayloadTypeSpec(18, 8000, "G729", frame_size=160)
    CELB = PayloadTypeSpec(25, 90000, "CelB")
    JPEG = PayloadTypeSpec(26, 90000, "JPEG")
    NV = PayloadTypeSpec(28, 90000, "nv")
    H261 = PayloadTypeSpec(31, 90000, "H261")
    #: MPEG-1 and MPEG-2 video
    MPV = PayloadTypeSpec(32, 90000, "MPV")
    #: MPEG-2 transport stream
    MP2T = PayloadTypeSpec(33, 90000, "MP2T")
    H263 = PayloadTypeSpec(34, 90000, "H263")

    @classmethod
    def from_pt(cls, pt: int) -> StaticPayloadType:
        """Look up a static payload type by its PT number."""
        for member in cls:
            if member.value.pt == pt:
                return member
        raise ValueError(f"No static payload type with PT {pt}")
from_pt(pt) classmethod

Look up a static payload type by its PT number.

Source code in voip/sdp/types.py
255
256
257
258
259
260
261
@classmethod
def from_pt(cls, pt: int) -> StaticPayloadType:
    """Look up a static payload type by its PT number."""
    for member in cls:
        if member.value.pt == pt:
            return member
    raise ValueError(f"No static payload type with PT {pt}")

StrField dataclass

Descriptor for SDP fields that parse and serialize as plain strings.

Source code in voip/sdp/types.py
39
40
41
42
43
44
45
46
47
48
49
50
@dataclasses.dataclass(slots=True, frozen=True)
class StrField:
    """Descriptor for SDP fields that parse and serialize as plain strings."""

    letter: str
    session_attr: str
    is_list: bool = False
    media_attr: str | None = None

    @staticmethod
    def parse(value: str) -> str:
        return value

Timing dataclass

Bases: ByteSerializableObject

Timing field (t=) as defined by RFC 4566 §5.9.

Source code in voip/sdp/types.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@dataclasses.dataclass(slots=True, frozen=True)
class Timing(ByteSerializableObject):
    """Timing field (t=) as defined by RFC 4566 §5.9."""

    letter: ClassVar[str] = "t"
    session_attr: ClassVar[str] = "timings"
    is_list: ClassVar[bool] = True
    media_attr: ClassVar[str | None] = None

    start_time: int
    stop_time: int

    def __bytes__(self) -> bytes:
        return f"{self.start_time} {self.stop_time}".encode()

    @classmethod
    def parse(cls, data: bytes | str) -> Timing:
        value = data.decode() if isinstance(data, bytes) else data
        start_time, stop_time = value.split(" ", 1)
        return cls(start_time=int(start_time), stop_time=int(stop_time))