Skip to content

Multimedia Sessions

Session and its subclasses handle the media exchange between call parties. They are created by the Dialog when a call is accepted or initiated.

Sessions can be audio, video, and more. However, this library currently only provides audio sessions via the AudioCall class. Video and other media types are fairly uncommon outside of consumer applications, and implementing them is on the roadmap but not yet a priority.

voip.rtp.Session dataclass

One call leg managed by the RTP multiplexer.

Associates a SIP dialog with the RealtimeTransportProtocol media stream. Subclass and override packet_received to process incoming media, and use send_packet to transmit outbound media.

The rtp back-reference allows sending media; the dialog back-reference carries the SIP dialog state and a reference to the SIP session (dialog.sip) so that the transport can be closed when the call ends.

Subclass voip.audio.AudioCall for audio calls with codec negotiation, buffering, and decoding.

Attributes:

Name Type Description
rtp RealtimeTransportProtocol

Shared RTP multiplexer socket that delivers packets to this handler.

dialog Dialog

SIP dialog state for this call leg.

media MediaDescription

Negotiated SDP media description for this call leg.

caller CallerID

Caller identifier as received in the SIP From header.

srtp SRTPSession | None

Optional SRTP session for encrypting and decrypting media.

Source code in voip/rtp.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@dataclasses.dataclass
class Session:
    """One call leg managed by the RTP multiplexer.

    Associates a SIP dialog with the `RealtimeTransportProtocol` media
    stream. Subclass and override `packet_received` to process incoming
    media, and use `send_packet` to transmit outbound media.

    The `rtp` back-reference allows sending media; the `dialog` back-reference
    carries the SIP dialog state and a reference to the SIP session
    (``dialog.sip``) so that the transport can be closed when the call ends.

    Subclass `voip.audio.AudioCall` for audio calls with codec
    negotiation, buffering, and decoding.

    Attributes:
        rtp: Shared RTP multiplexer socket that delivers packets to this handler.
        dialog: SIP dialog state for this call leg.
        media: Negotiated SDP media description for this call leg.
        caller: Caller identifier as received in the SIP From header.
        srtp: Optional SRTP session for encrypting and decrypting media.
    """

    rtp: RealtimeTransportProtocol
    dialog: Dialog
    media: MediaDescription
    caller: CallerID
    srtp: SRTPSession | None = None

    def packet_received(self, packet: RTPPacket, addr: NetworkAddress) -> None:
        """Handle a parsed RTP packet. Override in subclasses to process media.

        Args:
            packet: Parsed RTP packet.
            addr: Remote ``(host, port)`` the packet arrived from.
        """

    def send_packet(self, packet: RTPPacket, addr: NetworkAddress) -> None:
        """Serialize *packet* and send it via the shared RTP socket.

        Encrypts the packet with the call's SRTP session when one is set.

        Args:
            packet: RTP packet to send.
            addr: Destination ``(host, port)``.
        """
        data = bytes(packet)
        if self.srtp is not None:
            data = self.srtp.encrypt(data)
        self.rtp.send(data, addr)

    async def hang_up(self) -> None:
        """
        Terminate the call by sending a SIP BYE request [RFC 3261 §15].

        Deregisters this call from the RTP multiplexer, then delegates the
        BYE signaling to [Dialog.bye][voip.sip.Dialog.bye], which
        constructs and sends the BYE request, removes the dialog from the
        SIP session's registry, and awaits the 200 OK acknowledgment.

        The method is a no-op when no dialog is associated with this call.

        [RFC 3261 §15]: https://datatracker.ietf.org/doc/html/rfc3261#section-15
        """
        if self.dialog is None:
            return
        # Deregister the RTP handler for this call so no further media is
        # dispatched while the BYE is in flight.
        _not_found = object()
        remote_addr = next(
            (addr for addr, call in self.rtp.calls.items() if call is self),
            _not_found,
        )
        if remote_addr is not _not_found:
            self.rtp.unregister_call(remote_addr)
        await self.dialog.bye()

    @classmethod
    def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
        """Negotiate a media codec from the remote SDP offer.

        Override in subclasses to implement codec selection. The SIP layer
        calls this before sending a 200 OK; if the method raises the exception
        propagates and the call is not answered.

        Args:
            remote_media: The SDP ``m=audio`` section from the remote INVITE.

        Returns:
            A `MediaDescription` with the chosen codec.

        Raises:
            NotImplementedError: When not overridden by a subclass.
        """
        raise NotImplementedError(
            f"{cls.__name__} does not implement negotiate_codec. "
            "Override this classmethod in a subclass (e.g. AudioCall) to "
            "support codec negotiation."
        )

    @classmethod
    def sdp_formats(cls) -> list[RTPPayloadFormat]:
        """Return the list of supported payload formats for outbound SDP offers.

        Override in subclasses to advertise codec capabilities.
        [AudioCall][voip.audio.AudioCall] overrides this to return all
        supported codecs in priority order.

        Returns:
            List of [RTPPayloadFormat][voip.sdp.types.RTPPayloadFormat]
            objects describing the supported codecs.
        """
        from voip.sdp.types import StaticPayloadType  # noqa: PLC0415

        return [RTPPayloadFormat.from_pt(StaticPayloadType.PCMU.pt)]

hang_up() async

Terminate the call by sending a SIP BYE request RFC 3261 §15.

Deregisters this call from the RTP multiplexer, then delegates the BYE signaling to Dialog.bye, which constructs and sends the BYE request, removes the dialog from the SIP session's registry, and awaits the 200 OK acknowledgment.

The method is a no-op when no dialog is associated with this call.

Source code in voip/rtp.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def hang_up(self) -> None:
    """
    Terminate the call by sending a SIP BYE request [RFC 3261 §15].

    Deregisters this call from the RTP multiplexer, then delegates the
    BYE signaling to [Dialog.bye][voip.sip.Dialog.bye], which
    constructs and sends the BYE request, removes the dialog from the
    SIP session's registry, and awaits the 200 OK acknowledgment.

    The method is a no-op when no dialog is associated with this call.

    [RFC 3261 §15]: https://datatracker.ietf.org/doc/html/rfc3261#section-15
    """
    if self.dialog is None:
        return
    # Deregister the RTP handler for this call so no further media is
    # dispatched while the BYE is in flight.
    _not_found = object()
    remote_addr = next(
        (addr for addr, call in self.rtp.calls.items() if call is self),
        _not_found,
    )
    if remote_addr is not _not_found:
        self.rtp.unregister_call(remote_addr)
    await self.dialog.bye()

negotiate_codec(remote_media) classmethod

Negotiate a media codec from the remote SDP offer.

Override in subclasses to implement codec selection. The SIP layer calls this before sending a 200 OK; if the method raises the exception propagates and the call is not answered.

Parameters:

Name Type Description Default
remote_media MediaDescription

The SDP m=audio section from the remote INVITE.

required

Returns:

Type Description
MediaDescription

A MediaDescription with the chosen codec.

Raises:

Type Description
NotImplementedError

When not overridden by a subclass.

Source code in voip/rtp.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@classmethod
def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
    """Negotiate a media codec from the remote SDP offer.

    Override in subclasses to implement codec selection. The SIP layer
    calls this before sending a 200 OK; if the method raises the exception
    propagates and the call is not answered.

    Args:
        remote_media: The SDP ``m=audio`` section from the remote INVITE.

    Returns:
        A `MediaDescription` with the chosen codec.

    Raises:
        NotImplementedError: When not overridden by a subclass.
    """
    raise NotImplementedError(
        f"{cls.__name__} does not implement negotiate_codec. "
        "Override this classmethod in a subclass (e.g. AudioCall) to "
        "support codec negotiation."
    )

packet_received(packet, addr)

Handle a parsed RTP packet. Override in subclasses to process media.

Parameters:

Name Type Description Default
packet RTPPacket

Parsed RTP packet.

required
addr NetworkAddress

Remote (host, port) the packet arrived from.

required
Source code in voip/rtp.py
118
119
120
121
122
123
124
def packet_received(self, packet: RTPPacket, addr: NetworkAddress) -> None:
    """Handle a parsed RTP packet. Override in subclasses to process media.

    Args:
        packet: Parsed RTP packet.
        addr: Remote ``(host, port)`` the packet arrived from.
    """

sdp_formats() classmethod

Return the list of supported payload formats for outbound SDP offers.

Override in subclasses to advertise codec capabilities. AudioCall overrides this to return all supported codecs in priority order.

Returns:

Type Description
list[RTPPayloadFormat]
list[RTPPayloadFormat]

objects describing the supported codecs.

Source code in voip/rtp.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@classmethod
def sdp_formats(cls) -> list[RTPPayloadFormat]:
    """Return the list of supported payload formats for outbound SDP offers.

    Override in subclasses to advertise codec capabilities.
    [AudioCall][voip.audio.AudioCall] overrides this to return all
    supported codecs in priority order.

    Returns:
        List of [RTPPayloadFormat][voip.sdp.types.RTPPayloadFormat]
        objects describing the supported codecs.
    """
    from voip.sdp.types import StaticPayloadType  # noqa: PLC0415

    return [RTPPayloadFormat.from_pt(StaticPayloadType.PCMU.pt)]

send_packet(packet, addr)

Serialize packet and send it via the shared RTP socket.

Encrypts the packet with the call's SRTP session when one is set.

Parameters:

Name Type Description Default
packet RTPPacket

RTP packet to send.

required
addr NetworkAddress

Destination (host, port).

required
Source code in voip/rtp.py
126
127
128
129
130
131
132
133
134
135
136
137
138
def send_packet(self, packet: RTPPacket, addr: NetworkAddress) -> None:
    """Serialize *packet* and send it via the shared RTP socket.

    Encrypts the packet with the call's SRTP session when one is set.

    Args:
        packet: RTP packet to send.
        addr: Destination ``(host, port)``.
    """
    data = bytes(packet)
    if self.srtp is not None:
        data = self.srtp.encrypt(data)
    self.rtp.send(data, addr)

Audio Handling

voip.audio.AudioCall dataclass

Bases: Session

RTP call handler for audio calls supporting Opus, G.722, PCMA, and PCMU.

Attributes:

Name Type Description
supported_codecs list[type[RTPCodec]]

Preferred codecs in priority order (highest first).

rpt_packet_duration timedelta

Wall-clock spacing between outbound RTP packets in seconds.

Parameters:

Name Type Description Default
sampling_rate_hz int

Target sample rate in Hz for decoded audio delivered to audio_received.

16000
Source code in voip/audio.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
@dataclasses.dataclass(kw_only=True)
class AudioCall(Session):
    """
    RTP call handler for audio calls supporting Opus, G.722, PCMA, and PCMU.

    Attributes:
        supported_codecs: Preferred codecs in priority order (highest first).
        rpt_packet_duration: Wall-clock spacing between outbound RTP packets in seconds.

    Args:
        sampling_rate_hz: Target sample rate in Hz for decoded audio
             delivered to `audio_received`.
    """

    supported_codecs: ClassVar[list[type[RTPCodec]]] = [
        codecs.REGISTRY[name]
        for name in ("opus", "g722", "pcma", "pcmu")
        if name in codecs.REGISTRY
    ]
    rpt_packet_duration: ClassVar[datetime.timedelta] = datetime.timedelta(
        milliseconds=20
    )
    sampling_rate_hz: int = 16000

    codec: type[RTPCodec] = dataclasses.field(init=False, repr=False)
    payload_decoder: PayloadDecoder = dataclasses.field(init=False, repr=False)
    rtp_sequence_number: int = dataclasses.field(init=False, repr=False, default=0)
    rtp_timestamp: int = dataclasses.field(init=False, repr=False, default=0)
    rtp_ssrc: int = dataclasses.field(
        init=False, repr=False, default_factory=generate_ssrc
    )
    send_audio_lock: asyncio.Lock = dataclasses.field(
        default_factory=asyncio.Lock,
        init=False,
    )
    outbound_handle: asyncio.TimerHandle | None = dataclasses.field(
        default=None,
        init=False,
        repr=False,
    )

    def __post_init__(self) -> None:
        fmt = self.media.fmt[0]
        if fmt.encoding_name is None:
            raise ValueError(f"No encoding name for payload type {fmt.payload_type}")
        self.codec = codecs.get(fmt.encoding_name)
        self.payload_decoder = self.codec.create_decoder(
            self.sampling_rate_hz, input_rate_hz=self.sample_rate
        )

    @property
    def payload_type(self) -> int:
        """Negotiated RTP payload type number."""
        return self.codec.payload_type

    @property
    def sample_rate(self) -> int:
        """SDP-negotiated audio sample rate in Hz.

        Reflects the value from the remote `a=rtpmap` line.  For G.722 this
        is 8000 per RFC 3551 even though the codec runs at 16000 Hz
        internally; use `codec.sample_rate_hz` to get the actual audio rate.
        """
        return self.media.fmt[0].sample_rate or 8000

    @classmethod
    def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
        if not remote_media.fmt:
            raise NotImplementedError("Remote SDP offer contains no audio formats")

        remote_by_pt = {f.payload_type: f for f in remote_media.fmt}
        for codec in cls.supported_codecs:
            if codec.payload_type in remote_by_pt:
                remote_fmt = remote_by_pt[codec.payload_type]
                chosen = (
                    remote_fmt
                    if remote_fmt.encoding_name
                    else codec.to_payload_format()
                )
                return MediaDescription(
                    media="audio", port=0, proto=remote_media.proto, fmt=[chosen]
                )
            for remote_fmt in remote_media.fmt:
                if (
                    remote_fmt.encoding_name is not None
                    and remote_fmt.encoding_name.lower() == codec.encoding_name
                ):
                    return MediaDescription(
                        media="audio",
                        port=0,
                        proto=remote_media.proto,
                        fmt=[remote_fmt],
                    )

        raise NotImplementedError(
            f"No supported codec found in remote offer "
            f"{[f.payload_type for f in remote_media.fmt]!r}. "
            f"Supported: {[c.encoding_name for c in cls.supported_codecs]!r}"
        )

    @classmethod
    def sdp_formats(cls) -> list[RTPPayloadFormat]:
        """Return all supported payload formats for outbound SDP offers.

        Lists all codecs in `supported_codecs` priority order so the remote
        can select the best available codec.

        Returns:
            List of [RTPPayloadFormat][voip.sdp.types.RTPPayloadFormat]
            objects for every codec in `supported_codecs`.
        """
        return [codec.to_payload_format() for codec in cls.supported_codecs]

    def packet_received(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
        if packet.payload:
            asyncio.create_task(self.emit_audio(packet))

    async def emit_audio(self, packet: RTPPacket) -> None:
        audio = self.decode_payload(packet.payload)
        if audio.size > 0:
            self.audio_received(audio=audio, rms=self.rms(audio))

    def decode_payload(self, payload: bytes) -> np.ndarray:
        return self.payload_decoder.decode(payload)

    def next_rtp_packet(self, payload: bytes) -> RTPPacket:
        packet = RTPPacket(
            payload_type=self.codec.payload_type,
            sequence_number=self.rtp_sequence_number,
            timestamp=self.rtp_timestamp,
            ssrc=self.rtp_ssrc,
            payload=payload,
        )
        self.rtp_sequence_number += 1
        self.rtp_timestamp += self.codec.timestamp_increment
        return packet

    @classmethod
    def resample(
        cls, audio: np.ndarray, source_rate_hz: int, destination_rate_hz: int
    ) -> np.ndarray:
        return RTPCodec.resample(audio, source_rate_hz, destination_rate_hz)

    @staticmethod
    def rms(audio: np.ndarray) -> float:
        """
        Calculate the Root Mean Square (RMS) of an audio signal.

        Args:
            audio: Float32 mono PCM array.

        Returns:
            RMS value as a proxy for signal strength.
        """
        return float(np.sqrt(np.mean(np.square(audio))))

    def cancel_outbound_audio(self) -> None:
        """Stop the current outbound audio while it is being sent."""
        try:
            self.outbound_handle.cancel()
        except AttributeError:
            pass
        else:
            self.outbound_handle = None

    def on_audio_sent(self) -> None:
        """Handle completion of an outbound audio stream.

        Called once the last RTP packet of an outbound stream has been
        dispatched (i.e. `outbound_handle` transitions to ``None``).
        The base implementation is a no-op.  Override in subclasses to
        trigger post-audio actions, for example hanging up after
        [SayCall][voip.ai.SayCall] finishes speaking.
        """

    def _dispatch_next_packet(
        self,
        packets: Iterator[bytes],
        remote_addr: tuple[str, int],
        next_send_at: float,
    ) -> None:
        try:
            payload = next(packets)
        except StopIteration:
            self.outbound_handle = None
            self.on_audio_sent()
        else:
            self.send_packet(self.next_rtp_packet(payload), remote_addr)
            duration_seconds = self.rpt_packet_duration.total_seconds()
            next_deadline = next_send_at + duration_seconds
            loop = asyncio.get_running_loop()
            self.outbound_handle = loop.call_at(
                next_deadline,
                self._dispatch_next_packet,
                packets,
                remote_addr,
                next_deadline,
            )

    async def send_audio(self, audio: np.ndarray) -> None:
        """
        Encode `audio` with the negotiated codec and transmit via RTP.

        Args:
            audio: Float32 mono PCM at `codec.sample_rate_hz` Hz.
        """
        remote_addr = next(
            (addr for addr, call in self.rtp.calls.items() if call is self),
            None,
        )
        match remote_addr:
            case None:
                logger.warning(
                    "No remote RTP address for this call; dropping audio",
                )
                return
            case _:
                pass
        async with self.send_audio_lock:
            self.cancel_outbound_audio()
            loop = asyncio.get_running_loop()
            next_send_at = loop.time()
            self._dispatch_next_packet(
                self.codec.packetize(audio),
                remote_addr,
                next_send_at,
            )

    def audio_received(self, *, audio: np.ndarray, rms: float) -> None:
        """
        Handle decoded audio. Override in subclasses.

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
            rms: Root Mean Square of the decoded PCM, as a proxy for signal strength.
        """

payload_type property

Negotiated RTP payload type number.

sample_rate property

SDP-negotiated audio sample rate in Hz.

Reflects the value from the remote a=rtpmap line. For G.722 this is 8000 per RFC 3551 even though the codec runs at 16000 Hz internally; use codec.sample_rate_hz to get the actual audio rate.

audio_received(*, audio, rms)

Handle decoded audio. Override in subclasses.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

required
rms float

Root Mean Square of the decoded PCM, as a proxy for signal strength.

required
Source code in voip/audio.py
271
272
273
274
275
276
277
278
def audio_received(self, *, audio: np.ndarray, rms: float) -> None:
    """
    Handle decoded audio. Override in subclasses.

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
        rms: Root Mean Square of the decoded PCM, as a proxy for signal strength.
    """

cancel_outbound_audio()

Stop the current outbound audio while it is being sent.

Source code in voip/audio.py
199
200
201
202
203
204
205
206
def cancel_outbound_audio(self) -> None:
    """Stop the current outbound audio while it is being sent."""
    try:
        self.outbound_handle.cancel()
    except AttributeError:
        pass
    else:
        self.outbound_handle = None

on_audio_sent()

Handle completion of an outbound audio stream.

Called once the last RTP packet of an outbound stream has been dispatched (i.e. outbound_handle transitions to None). The base implementation is a no-op. Override in subclasses to trigger post-audio actions, for example hanging up after SayCall finishes speaking.

Source code in voip/audio.py
208
209
210
211
212
213
214
215
216
def on_audio_sent(self) -> None:
    """Handle completion of an outbound audio stream.

    Called once the last RTP packet of an outbound stream has been
    dispatched (i.e. `outbound_handle` transitions to ``None``).
    The base implementation is a no-op.  Override in subclasses to
    trigger post-audio actions, for example hanging up after
    [SayCall][voip.ai.SayCall] finishes speaking.
    """

rms(audio) staticmethod

Calculate the Root Mean Square (RMS) of an audio signal.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array.

required

Returns:

Type Description
float

RMS value as a proxy for signal strength.

Source code in voip/audio.py
186
187
188
189
190
191
192
193
194
195
196
197
@staticmethod
def rms(audio: np.ndarray) -> float:
    """
    Calculate the Root Mean Square (RMS) of an audio signal.

    Args:
        audio: Float32 mono PCM array.

    Returns:
        RMS value as a proxy for signal strength.
    """
    return float(np.sqrt(np.mean(np.square(audio))))

sdp_formats() classmethod

Return all supported payload formats for outbound SDP offers.

Lists all codecs in supported_codecs priority order so the remote can select the best available codec.

Returns:

Type Description
list[RTPPayloadFormat]
list[RTPPayloadFormat]

objects for every codec in supported_codecs.

Source code in voip/audio.py
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def sdp_formats(cls) -> list[RTPPayloadFormat]:
    """Return all supported payload formats for outbound SDP offers.

    Lists all codecs in `supported_codecs` priority order so the remote
    can select the best available codec.

    Returns:
        List of [RTPPayloadFormat][voip.sdp.types.RTPPayloadFormat]
        objects for every codec in `supported_codecs`.
    """
    return [codec.to_payload_format() for codec in cls.supported_codecs]

send_audio(audio) async

Encode audio with the negotiated codec and transmit via RTP.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM at codec.sample_rate_hz Hz.

required
Source code in voip/audio.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
async def send_audio(self, audio: np.ndarray) -> None:
    """
    Encode `audio` with the negotiated codec and transmit via RTP.

    Args:
        audio: Float32 mono PCM at `codec.sample_rate_hz` Hz.
    """
    remote_addr = next(
        (addr for addr, call in self.rtp.calls.items() if call is self),
        None,
    )
    match remote_addr:
        case None:
            logger.warning(
                "No remote RTP address for this call; dropping audio",
            )
            return
        case _:
            pass
    async with self.send_audio_lock:
        self.cancel_outbound_audio()
        loop = asyncio.get_running_loop()
        next_send_at = loop.time()
        self._dispatch_next_packet(
            self.codec.packetize(audio),
            remote_addr,
            next_send_at,
        )

voip.audio.VoiceActivityCall dataclass

Bases: AudioCall

AudioCall with energy-based Voice Activity Detection (VAD) and speech buffering.

Full utterances are buffered and passed to voice_received. Silent chunks are dropped from the audio stream.

Override that method in subclasses to process complete speech segments (e.G. transcribe them, echo them back, etc.) instead of raw audio frames.

An utterance is considered complete when the RMS of the buffered audio drops below voice_rms_threshold for at least [silence_gap] seconds.

Full utterances with an RMS sound power below utterances_rms_threshold are discarded.

A full utterance must be separated from the previous one by at least the silence_gap to be considered complete and passed to voice_received.

Example

The following example shows how to use VoiceActivityCall to echo a caller's voice back to them similar to EchoCall.

import dataclasses

from voip.audio import VoiceActivityCall


@dataclasses.dataclass(kw_only=True)
class EchoCall(VoiceActivityCall):

    async def voice_received(self, audio: np.ndarray) -> None:
        resampled = self.resample(
            audio, self.sampling_rate_hz, self.codec.sample_rate_hz
        )
        await self.send_audio(resampled)

Parameters:

Name Type Description Default
voice_rms_threshold float

Minimum RMS sound power voice detection.

0.001
utterances_rms_threshold float

Minimum RMS sound power for an utterance.

0.01
silence_gap timedelta

Minimum duration of silence to consider an utterance complete.

timedelta(milliseconds=200)
Source code in voip/audio.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@dataclasses.dataclass(kw_only=True)
class VoiceActivityCall(AudioCall):
    """
    AudioCall with energy-based Voice Activity Detection (VAD) and speech buffering.

    Full utterances are buffered and passed to
    [voice_received][voip.audio.VoiceActivityCall.voice_received].
    Silent chunks are dropped from the audio stream.

    Override that method in subclasses to process complete speech segments
    (e.G. transcribe them, echo them back, etc.) instead of raw audio frames.

    An utterance is considered complete when the RMS of the buffered audio
    drops below `voice_rms_threshold` for at least [silence_gap] seconds.

    Full utterances with an RMS sound power below `utterances_rms_threshold`
    are discarded.

    A full utterance must be separated from the previous one by at least the
    `silence_gap` to be considered complete and passed to
    [voice_received][voip.audio.VoiceActivityCall.voice_received].

    Example:
        The following example shows how to use `VoiceActivityCall` to echo a caller's
        voice back to them similar to [EchoCall][voip.audio.EchoCall].

        ```python
        import dataclasses

        from voip.audio import VoiceActivityCall


        @dataclasses.dataclass(kw_only=True)
        class EchoCall(VoiceActivityCall):

            async def voice_received(self, audio: np.ndarray) -> None:
                resampled = self.resample(
                    audio, self.sampling_rate_hz, self.codec.sample_rate_hz
                )
                await self.send_audio(resampled)
        ```

    Args:
        voice_rms_threshold: Minimum RMS sound power voice detection.
        utterances_rms_threshold: Minimum RMS sound power for an utterance.
        silence_gap: Minimum duration of silence to consider an utterance complete.
    """

    voice_rms_threshold: float = 0.001
    utterances_rms_threshold: float = 0.01
    silence_gap: datetime.timedelta = dataclasses.field(
        default=datetime.timedelta(milliseconds=200)
    )

    _speech_buffer: np.ndarray = dataclasses.field(
        init=False, repr=False, default_factory=lambda: np.empty((0,), dtype=np.float32)
    )
    _flush_voice_buffer_handle: asyncio.TimerHandle | None = dataclasses.field(
        init=False, repr=False, default=None
    )

    def audio_received(self, *, audio: np.ndarray, rms: float) -> None:
        self._speech_buffer = np.concatenate((self._speech_buffer, audio))
        if rms > self.voice_rms_threshold:
            self.on_audio_speech()
        else:
            self.on_audio_silence()

    def on_audio_speech(self) -> None:
        if self._flush_voice_buffer_handle is not None:
            self._flush_voice_buffer_handle.cancel()
            self._flush_voice_buffer_handle = None

    def on_audio_silence(self) -> None:
        if self._flush_voice_buffer_handle is None:
            loop = asyncio.get_event_loop()
            self._flush_voice_buffer_handle = loop.call_later(
                self.silence_gap.total_seconds(),
                self.flush_voice_buffer,
            )

    def flush_voice_buffer(self) -> None:
        self._flush_voice_buffer_handle = None
        # Ensure at least one second of audio to avoid cutting words in half.
        if not (
            len(self._speech_buffer)
            < self.sampling_rate_hz * self.silence_gap.total_seconds()
            or self.rms(self._speech_buffer) < self.utterances_rms_threshold
        ):
            asyncio.create_task(self.voice_received(self._speech_buffer.copy()))
        self._speech_buffer = np.empty((0,), dtype=np.float32)

    async def voice_received(self, audio: np.ndarray) -> None:
        """Handle the flushed speech buffer.  Override in subclasses.

        This base implementation is a no-op.  Subclasses must override this
        method to process the buffered utterance (e.g. echo it back, transcribe
        it, etc.).

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz
                containing the full buffered utterance.
        """

voice_received(audio) async

Handle the flushed speech buffer. Override in subclasses.

This base implementation is a no-op. Subclasses must override this method to process the buffered utterance (e.g. echo it back, transcribe it, etc.).

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz containing the full buffered utterance.

required
Source code in voip/audio.py
373
374
375
376
377
378
379
380
381
382
383
async def voice_received(self, audio: np.ndarray) -> None:
    """Handle the flushed speech buffer.  Override in subclasses.

    This base implementation is a no-op.  Subclasses must override this
    method to process the buffered utterance (e.g. echo it back, transcribe
    it, etc.).

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz
            containing the full buffered utterance.
    """

voip.audio.EchoCall dataclass

Bases: VoiceActivityCall

Echo the caller's speech back after they finish speaking.

Buffers a full utterance and replays it once a sustained silence lasting silence_gap seconds is detected. This gives the caller a natural echo of their own voice, useful for network latency testing and call-flow demonstrations.

Example
class MySession(SessionInitiationProtocol):
    def call_received(self, request: Request) -> None:
        self.answer(request=request, session_class=EchoCall)
Source code in voip/audio.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
@dataclasses.dataclass(kw_only=True, slots=True)
class EchoCall(VoiceActivityCall):
    """Echo the caller's speech back after they finish speaking.

    Buffers a full utterance and replays it once a sustained silence lasting
    `silence_gap` seconds is detected. This gives the caller a natural echo
    of their own voice, useful for network latency testing and call-flow
    demonstrations.

    Example:
        ```python
        class MySession(SessionInitiationProtocol):
            def call_received(self, request: Request) -> None:
                self.answer(request=request, session_class=EchoCall)
        ```
    """

    async def voice_received(self, audio: np.ndarray) -> None:
        resampled = self.resample(
            audio, self.sampling_rate_hz, self.codec.sample_rate_hz
        )
        await self.send_audio(resampled)

AI Calls

voip.ai.TranscribeCall dataclass

Bases: VoiceActivityCall

Transcribe incoming call audio.

Audio is decoded by AudioCall on a per-packet basis and delivered to audio_received, which applies an energy-based voice activity detector (VAD) from VoiceActivityCall. All audio frames (speech and silence) are accumulated until silence is sustained for silence_gap seconds, then the entire utterance is sent to Whisper as one chunk. This avoids cutting sentences in the middle and prevents background microphone noise from being passed to Whisper as spurious audio.

Example

Override transcription_received to handle the resulting text:

class MySession(SessionInitiationProtocol):
    def call_received(self, request: Request) -> None:
        self.answer(request=request, session_class=MyCall)

To share one model instance across multiple calls (recommended to avoid loading it multiple times) pass a preloaded WhisperModel:

shared_model = WhisperModel("base")

class MyCall(TranscribeCall):
    model = shared_model

Parameters:

Name Type Description Default
stt_model WhisperModel

Whisper model to use for transcription. Defaults to "base".

(lambda: WhisperModel('base'))()
Source code in voip/ai.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclasses.dataclass(kw_only=True, slots=True)
class TranscribeCall(VoiceActivityCall):
    """Transcribe incoming call audio.

    Audio is decoded by [AudioCall][voip.audio.AudioCall] on a per-packet
    basis and delivered to [audio_received][voip.audio.AudioCall.audio_received],
    which applies an energy-based voice activity detector (VAD) from
    [VoiceActivityCall][voip.audio.VoiceActivityCall].  All audio frames
    (speech and silence) are accumulated until silence is sustained for
    `silence_gap` seconds, then the entire utterance is sent to Whisper as
    one chunk.  This avoids cutting sentences in the middle and prevents
    background microphone noise from being passed to Whisper as spurious audio.

    Example:
        Override [transcription_received][voip.ai.TranscribeCall.transcription_received]
        to handle the resulting text:

        ```python
        class MySession(SessionInitiationProtocol):
            def call_received(self, request: Request) -> None:
                self.answer(request=request, session_class=MyCall)
        ```

        To share one model instance across multiple calls (recommended to avoid
        loading it multiple times) pass a preloaded `WhisperModel`:

        ```python
        shared_model = WhisperModel("base")

        class MyCall(TranscribeCall):
            model = shared_model
        ```

    Args:
        stt_model: Whisper model to use for transcription.  Defaults to "base".

    """

    stt_model: WhisperModel = dataclasses.field(
        default_factory=lambda: WhisperModel("base")
    )

    async def voice_received(self, audio: np.ndarray) -> None:
        await self.transcribe(audio)

    async def transcribe(self, audio: np.ndarray) -> None:
        loop = asyncio.get_running_loop()
        raw = await loop.run_in_executor(None, self.run_transcription, audio)
        if text := raw.strip():
            self.transcription_received(text)

    def run_transcription(self, audio: np.ndarray) -> str:
        segments, _ = self.stt_model.transcribe(audio)
        result = "".join(segment.text for segment in segments)
        logger.debug("Transcription result: %r", result)
        return result

    def transcription_received(self, text: str) -> None:
        """Handle a transcription result.  Override in subclasses.

        Args:
            text: Transcribed text for this audio chunk (already stripped).
        """

transcription_received(text)

Handle a transcription result. Override in subclasses.

Parameters:

Name Type Description Default
text str

Transcribed text for this audio chunk (already stripped).

required
Source code in voip/ai.py
91
92
93
94
95
96
def transcription_received(self, text: str) -> None:
    """Handle a transcription result.  Override in subclasses.

    Args:
        text: Transcribed text for this audio chunk (already stripped).
    """

voip.ai.AgentCall dataclass

Bases: TTSMixin, TranscribeCall

Respond to caller voice inputs with voice responses.

Uses Ollama to generate responses to transcribed text and Pocket TTS to synthesize voice replies.

Parameters:

Name Type Description Default
system_prompt str

Prompt to guide the language model.

'You are a person on a phone call. Keep your answers very brief and conversational. YOU MUST NEVER USE NON-VERBAL CHARACTERS IN YOUR RESPONSES!'
llm_model str

Ollama model to use for text generation.

'ministral-3'
tts_model TTSModel

Pocket TTS model to use for voice synthesis.

(lambda: load_model())()
voice Path | str | Tensor

Voice to use for synthesis.

'azelma'
salutation str

Opening message sent as soon as the call is established.

'Hi.'
audio_interrupt_duration timedelta

Time you have to talk over the agent to interrupt the outbound audio.

timedelta(seconds=0.75)
Source code in voip/ai.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@dataclasses.dataclass(kw_only=True, slots=True)
class AgentCall(TTSMixin, TranscribeCall):
    """Respond to caller voice inputs with voice responses.

    Uses Ollama to generate responses to transcribed
    text and Pocket TTS to synthesize voice replies.

    Args:
        system_prompt: Prompt to guide the language model.
        llm_model: Ollama model to use for text generation.
        tts_model: Pocket TTS model to use for voice synthesis.
        voice: Voice to use for synthesis.
        salutation: Opening message sent as soon as the call is established.
        audio_interrupt_duration: Time you have to talk over the agent to interrupt the outbound audio.
    """

    system_prompt: str = (
        "You are a person on a phone call."
        " Keep your answers very brief and conversational."
        " YOU MUST NEVER USE NON-VERBAL CHARACTERS IN YOUR RESPONSES!"
    )
    llm_model: str = dataclasses.field(default="ministral-3")
    voice: pathlib.Path | str | torch.Tensor = dataclasses.field(default="azelma")
    salutation: str = dataclasses.field(default="Hi.")
    audio_interrupt_duration: datetime.timedelta = datetime.timedelta(seconds=0.75)

    _messages: list[dict] = dataclasses.field(init=False, repr=False)
    _response_task: asyncio.Task | None = dataclasses.field(
        init=False, repr=False, default=None
    )
    _cancel_audio_handle: asyncio.Handle | None = dataclasses.field(
        init=False, repr=False, default=None
    )

    emoji_pattern: typing.ClassVar[typing.Pattern[str]] = re.compile(
        "["
        "\U0001f600-\U0001f64f"  # emoticons
        "\U0001f300-\U0001f5ff"  # symbols & pictographs
        "\U0001f680-\U0001f6ff"  # transport & map symbols
        "\U0001f1e0-\U0001f1ff"  # flags (iOS)
        "\U00002702-\U000027b0"
        "\U000024c2-\U0001f251"
        "]+",
        flags=re.UNICODE,
    )

    def __post_init__(self) -> None:
        super().__post_init__()
        self._messages = [
            {
                "role": "system",
                "content": self.system_prompt,
            }
        ]
        if self.salutation:
            self._messages.append({"role": "assistant", "content": self.salutation})
            asyncio.create_task(self.send_speech(self.salutation))

    def transcription_received(self, text: str) -> None:
        self.cancel_outbound_audio()
        self._messages.append({"role": "user", "content": text})
        if self._response_task is not None and not self._response_task.done():
            self._response_task.cancel()
        self._response_task = asyncio.create_task(self.respond())

    async def respond(self) -> None:
        response = await ollama.AsyncClient().chat(
            model=self.llm_model,
            messages=self._messages,
        )
        if reply := self.emoji_pattern.sub("", response.message.content or ""):
            self._messages.append({"role": "assistant", "content": reply})
            logger.debug("Agent reply: %r", reply)
            await self.send_speech(reply)

    def on_audio_speech(self) -> None:
        loop = asyncio.get_event_loop()
        if self._cancel_audio_handle is None:
            self._cancel_audio_handle = loop.call_later(
                self.audio_interrupt_duration.total_seconds(),
                self.cancel_outbound_audio,
            )
        super().on_audio_speech()

    def on_audio_silence(self) -> None:
        super().on_audio_silence()
        if self._cancel_audio_handle is not None:
            self._cancel_audio_handle.cancel()
            self._cancel_audio_handle = None

voip.ai.SayCall dataclass

Bases: TTSMixin, AudioCall

Dial a number, say a message using TTS, and hang up.

Source code in voip/ai.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@dataclasses.dataclass(kw_only=True, slots=True)
class SayCall(TTSMixin, AudioCall):
    """Dial a number, say a message using TTS, and hang up."""

    text: str

    def __post_init__(self) -> None:
        super().__post_init__()
        asyncio.create_task(self.send_speech(self.text))

    def on_audio_sent(self) -> None:
        asyncio.create_task(self.hang_up())

    async def hang_up(self) -> None:
        await super().hang_up()
        if self.dialog is not None and self.dialog.sip is not None:
            self.dialog.sip.close()