Skip to content

Call legs

RTPCall is the base class for all call leg handlers.

Audio Handling

voip.audio.AudioCall dataclass

Bases: RTPCall

RTP call handler with audio buffering, codec negotiation, decoding, and encoding.

Codec selection is driven by PREFERRED_CODECS. Override that list in a subclass to change priority. The selected codec class is stored on codec after __post_init__ and used for all encode/decode operations.

Source code in voip/audio.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@dataclasses.dataclass
class AudioCall(RTPCall):
    """RTP call handler with audio buffering, codec negotiation, decoding, and encoding.

    Codec selection is driven by `PREFERRED_CODECS`.
    Override that list in a subclass to change priority.  The selected codec
    class is stored on `codec` after `__post_init__` and used for all
    encode/decode operations.
    """

    #: Preferred codecs in priority order (highest priority first).
    #: Populated from [`voip.codecs.REGISTRY`][voip.codecs.REGISTRY] at import
    #: time; falls back to pure-NumPy codecs when the ``pyav`` extra is absent.
    PREFERRED_CODECS: ClassVar[list[type[RTPCodec]]] = [
        codecs.REGISTRY[name]
        for name in ("opus", "g722", "pcma", "pcmu")
        if name in codecs.REGISTRY
    ]

    #: Target sample rate for decoded audio delivered to `audio_received`.
    RESAMPLING_RATE_HZ: ClassVar[int] = 16000

    #: Wall-clock spacing between outbound RTP packets in seconds.
    RTP_PACKET_DURATION_SECS: ClassVar[float] = 0.02

    #: Resolved codec class for this call, set in `__post_init__`.
    codec: type[RTPCodec] = dataclasses.field(init=False, repr=False)

    #: Outbound RTP sequence counter.
    rtp_sequence_number: int = dataclasses.field(init=False, repr=False, default=0)
    #: Outbound RTP timestamp counter.
    rtp_timestamp: int = dataclasses.field(init=False, repr=False, default=0)
    #: Outbound RTP synchronisation source identifier.
    rtp_ssrc: int = dataclasses.field(
        init=False, repr=False, default_factory=generate_ssrc
    )

    def __post_init__(self) -> None:
        fmt = self.media.fmt[0]
        if fmt.encoding_name is None:
            raise ValueError(f"No encoding name for payload type {fmt.payload_type}")
        self.codec = codecs.get(fmt.encoding_name)
        logger.info(
            json.dumps(
                {
                    "event": "call_started",
                    "caller": repr(self.caller),
                    "codec": fmt.encoding_name,
                    "sample_rate": fmt.sample_rate or 0,
                    "channels": fmt.channels,
                    "payload_type": fmt.payload_type,
                }
            ),
            extra={
                "caller": repr(self.caller),
                "codec": fmt.encoding_name,
                "payload_type": fmt.payload_type,
            },
        )

    @property
    def payload_type(self) -> int:
        """Negotiated RTP payload type number."""
        return self.codec.payload_type

    @property
    def sample_rate(self) -> int:
        """SDP-negotiated audio sample rate in Hz.

        Reflects the value from the remote `a=rtpmap` line.  For G.722 this
        is 8000 per RFC 3551 even though the codec runs at 16000 Hz
        internally; use `codec.sample_rate_hz` to get the actual audio rate.
        """
        return self.media.fmt[0].sample_rate or 8000

    @classmethod
    def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
        """Select the best codec from the remote SDP offer.

        Iterates `PREFERRED_CODECS`
        in priority order, matching first by payload type number and then by
        encoding name for dynamic payload types.

        Args:
            remote_media: The `m=audio` section from the remote INVITE SDP.

        Returns:
            A [`MediaDescription`][voip.sdp.types.MediaDescription] with the
            chosen codec.

        Raises:
            NotImplementedError: When no offered codec is in `PREFERRED_CODECS`.
        """
        if not remote_media.fmt:
            raise NotImplementedError("Remote SDP offer contains no audio formats")

        remote_by_pt = {f.payload_type: f for f in remote_media.fmt}
        for codec in cls.PREFERRED_CODECS:
            if codec.payload_type in remote_by_pt:
                remote_fmt = remote_by_pt[codec.payload_type]
                chosen = (
                    remote_fmt
                    if remote_fmt.encoding_name
                    else codec.to_payload_format()
                )
                return MediaDescription(
                    media="audio", port=0, proto=remote_media.proto, fmt=[chosen]
                )
            for remote_fmt in remote_media.fmt:
                if (
                    remote_fmt.encoding_name is not None
                    and remote_fmt.encoding_name.lower() == codec.encoding_name
                ):
                    return MediaDescription(
                        media="audio",
                        port=0,
                        proto=remote_media.proto,
                        fmt=[remote_fmt],
                    )

        raise NotImplementedError(
            f"No supported codec found in remote offer "
            f"{[f.payload_type for f in remote_media.fmt]!r}. "
            f"Supported: {[c.encoding_name for c in cls.PREFERRED_CODECS]!r}"
        )

    def packet_received(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
        """Schedule audio decoding and delivery for *packet*.

        Ignores packets with an empty payload.

        Args:
            packet: Parsed RTP packet.
            addr: Remote ``(host, port)`` the packet arrived from.
        """
        if packet.payload:
            asyncio.create_task(self.emit_audio(packet))

    async def emit_audio(self, packet: RTPPacket) -> None:
        """Decode *packet* and call [`audio_received`][voip.audio.AudioCall.audio_received].

        Args:
            packet: Parsed RTP packet whose payload will be decoded.
        """
        loop = asyncio.get_running_loop()
        audio = await loop.run_in_executor(None, self.decode_payload, packet.payload)
        if audio.size > 0:
            self.audio_received(
                audio=audio, rms=float(np.sqrt(np.mean(np.square(audio))))
            )

    def decode_payload(self, payload: bytes) -> np.ndarray:
        """Decode an RTP payload to float32 PCM at `RESAMPLING_RATE_HZ`.

        Delegates to the negotiated `codec`,
        passing the SDP-negotiated `sample_rate` as the input rate hint so
        that non-standard variants (e.g. wideband PCMA at 16 000 Hz) are
        handled correctly.

        Args:
            payload: Raw RTP payload bytes.

        Returns:
            Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
        """
        return self.codec.decode(
            payload, self.RESAMPLING_RATE_HZ, input_rate_hz=self.sample_rate
        )

    def audio_received(self, *, audio: np.ndarray, rms: float) -> None:
        """Handle decoded audio.  Override in subclasses.

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
            rms: Root mean square of the decoded PCM, as a proxy for signal
                strength.
        """

    async def send_rtp_audio(self, audio: np.ndarray) -> None:
        """Encode *audio* with the negotiated codec and transmit via RTP.

        Looks up the caller's remote RTP address from the shared
        [`RealtimeTransportProtocol`][voip.rtp.RealtimeTransportProtocol] call
        registry and transmits encoded audio as 20 ms RTP packets, sleeping
        `RTP_PACKET_DURATION_SECS` between each packet.

        Args:
            audio: Float32 mono PCM at `codec.sample_rate_hz` Hz.
        """
        remote_addr = next(
            (addr for addr, call in self.rtp.calls.items() if call is self),
            None,
        )
        if remote_addr is None:
            logger.warning("No remote RTP address for this call; dropping audio")
            return
        for payload in self.codec.packetize(audio):
            self.send_packet(self.next_rtp_packet(payload), remote_addr)
            await asyncio.sleep(self.RTP_PACKET_DURATION_SECS)

    def next_rtp_packet(self, payload: bytes) -> RTPPacket:
        """Create the next outbound RTP packet, incrementing sequence and timestamp.

        Args:
            payload: Encoded audio payload bytes.

        Returns:
            RTP packet ready for transmission.
        """
        packet = RTPPacket(
            payload_type=self.codec.payload_type,
            sequence_number=self.rtp_sequence_number & 0xFFFF,
            timestamp=self.rtp_timestamp & 0xFFFFFFFF,
            ssrc=self.rtp_ssrc,
            payload=payload,
        )
        self.rtp_sequence_number += 1
        self.rtp_timestamp += self.codec.timestamp_increment
        return packet

    @classmethod
    def resample(
        cls, audio: np.ndarray, source_rate_hz: int, destination_rate_hz: int
    ) -> np.ndarray:
        """Resample *audio* from *source_rate_hz* to *destination_rate_hz*.

        Delegates to [`RTPCodec.resample`][voip.codecs.base.RTPCodec.resample].

        Args:
            audio: Float32 mono PCM array.
            source_rate_hz: Sample rate of *audio* in Hz.
            destination_rate_hz: Target sample rate in Hz.

        Returns:
            Resampled float32 array at *destination_rate_hz* Hz.
        """
        return RTPCodec.resample(audio, source_rate_hz, destination_rate_hz)

payload_type property

Negotiated RTP payload type number.

sample_rate property

SDP-negotiated audio sample rate in Hz.

Reflects the value from the remote a=rtpmap line. For G.722 this is 8000 per RFC 3551 even though the codec runs at 16000 Hz internally; use codec.sample_rate_hz to get the actual audio rate.

audio_received(*, audio, rms)

Handle decoded audio. Override in subclasses.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

required
rms float

Root mean square of the decoded PCM, as a proxy for signal strength.

required
Source code in voip/audio.py
212
213
214
215
216
217
218
219
def audio_received(self, *, audio: np.ndarray, rms: float) -> None:
    """Handle decoded audio.  Override in subclasses.

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
        rms: Root mean square of the decoded PCM, as a proxy for signal
            strength.
    """

decode_payload(payload)

Decode an RTP payload to float32 PCM at RESAMPLING_RATE_HZ.

Delegates to the negotiated codec, passing the SDP-negotiated sample_rate as the input rate hint so that non-standard variants (e.g. wideband PCMA at 16 000 Hz) are handled correctly.

Parameters:

Name Type Description Default
payload bytes

Raw RTP payload bytes.

required

Returns:

Type Description
ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

Source code in voip/audio.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def decode_payload(self, payload: bytes) -> np.ndarray:
    """Decode an RTP payload to float32 PCM at `RESAMPLING_RATE_HZ`.

    Delegates to the negotiated `codec`,
    passing the SDP-negotiated `sample_rate` as the input rate hint so
    that non-standard variants (e.g. wideband PCMA at 16 000 Hz) are
    handled correctly.

    Args:
        payload: Raw RTP payload bytes.

    Returns:
        Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
    """
    return self.codec.decode(
        payload, self.RESAMPLING_RATE_HZ, input_rate_hz=self.sample_rate
    )

emit_audio(packet) async

Decode packet and call audio_received.

Parameters:

Name Type Description Default
packet RTPPacket

Parsed RTP packet whose payload will be decoded.

required
Source code in voip/audio.py
181
182
183
184
185
186
187
188
189
190
191
192
async def emit_audio(self, packet: RTPPacket) -> None:
    """Decode *packet* and call [`audio_received`][voip.audio.AudioCall.audio_received].

    Args:
        packet: Parsed RTP packet whose payload will be decoded.
    """
    loop = asyncio.get_running_loop()
    audio = await loop.run_in_executor(None, self.decode_payload, packet.payload)
    if audio.size > 0:
        self.audio_received(
            audio=audio, rms=float(np.sqrt(np.mean(np.square(audio))))
        )

negotiate_codec(remote_media) classmethod

Select the best codec from the remote SDP offer.

Iterates PREFERRED_CODECS in priority order, matching first by payload type number and then by encoding name for dynamic payload types.

Parameters:

Name Type Description Default
remote_media MediaDescription

The m=audio section from the remote INVITE SDP.

required

Returns:

Type Description
MediaDescription

A MediaDescription with the

MediaDescription

chosen codec.

Raises:

Type Description
NotImplementedError

When no offered codec is in PREFERRED_CODECS.

Source code in voip/audio.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@classmethod
def negotiate_codec(cls, remote_media: MediaDescription) -> MediaDescription:
    """Select the best codec from the remote SDP offer.

    Iterates `PREFERRED_CODECS`
    in priority order, matching first by payload type number and then by
    encoding name for dynamic payload types.

    Args:
        remote_media: The `m=audio` section from the remote INVITE SDP.

    Returns:
        A [`MediaDescription`][voip.sdp.types.MediaDescription] with the
        chosen codec.

    Raises:
        NotImplementedError: When no offered codec is in `PREFERRED_CODECS`.
    """
    if not remote_media.fmt:
        raise NotImplementedError("Remote SDP offer contains no audio formats")

    remote_by_pt = {f.payload_type: f for f in remote_media.fmt}
    for codec in cls.PREFERRED_CODECS:
        if codec.payload_type in remote_by_pt:
            remote_fmt = remote_by_pt[codec.payload_type]
            chosen = (
                remote_fmt
                if remote_fmt.encoding_name
                else codec.to_payload_format()
            )
            return MediaDescription(
                media="audio", port=0, proto=remote_media.proto, fmt=[chosen]
            )
        for remote_fmt in remote_media.fmt:
            if (
                remote_fmt.encoding_name is not None
                and remote_fmt.encoding_name.lower() == codec.encoding_name
            ):
                return MediaDescription(
                    media="audio",
                    port=0,
                    proto=remote_media.proto,
                    fmt=[remote_fmt],
                )

    raise NotImplementedError(
        f"No supported codec found in remote offer "
        f"{[f.payload_type for f in remote_media.fmt]!r}. "
        f"Supported: {[c.encoding_name for c in cls.PREFERRED_CODECS]!r}"
    )

next_rtp_packet(payload)

Create the next outbound RTP packet, incrementing sequence and timestamp.

Parameters:

Name Type Description Default
payload bytes

Encoded audio payload bytes.

required

Returns:

Type Description
RTPPacket

RTP packet ready for transmission.

Source code in voip/audio.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def next_rtp_packet(self, payload: bytes) -> RTPPacket:
    """Create the next outbound RTP packet, incrementing sequence and timestamp.

    Args:
        payload: Encoded audio payload bytes.

    Returns:
        RTP packet ready for transmission.
    """
    packet = RTPPacket(
        payload_type=self.codec.payload_type,
        sequence_number=self.rtp_sequence_number & 0xFFFF,
        timestamp=self.rtp_timestamp & 0xFFFFFFFF,
        ssrc=self.rtp_ssrc,
        payload=payload,
    )
    self.rtp_sequence_number += 1
    self.rtp_timestamp += self.codec.timestamp_increment
    return packet

packet_received(packet, addr)

Schedule audio decoding and delivery for packet.

Ignores packets with an empty payload.

Parameters:

Name Type Description Default
packet RTPPacket

Parsed RTP packet.

required
addr tuple[str, int]

Remote (host, port) the packet arrived from.

required
Source code in voip/audio.py
169
170
171
172
173
174
175
176
177
178
179
def packet_received(self, packet: RTPPacket, addr: tuple[str, int]) -> None:
    """Schedule audio decoding and delivery for *packet*.

    Ignores packets with an empty payload.

    Args:
        packet: Parsed RTP packet.
        addr: Remote ``(host, port)`` the packet arrived from.
    """
    if packet.payload:
        asyncio.create_task(self.emit_audio(packet))

resample(audio, source_rate_hz, destination_rate_hz) classmethod

Resample audio from source_rate_hz to destination_rate_hz.

Delegates to RTPCodec.resample.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array.

required
source_rate_hz int

Sample rate of audio in Hz.

required
destination_rate_hz int

Target sample rate in Hz.

required

Returns:

Type Description
ndarray

Resampled float32 array at destination_rate_hz Hz.

Source code in voip/audio.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@classmethod
def resample(
    cls, audio: np.ndarray, source_rate_hz: int, destination_rate_hz: int
) -> np.ndarray:
    """Resample *audio* from *source_rate_hz* to *destination_rate_hz*.

    Delegates to [`RTPCodec.resample`][voip.codecs.base.RTPCodec.resample].

    Args:
        audio: Float32 mono PCM array.
        source_rate_hz: Sample rate of *audio* in Hz.
        destination_rate_hz: Target sample rate in Hz.

    Returns:
        Resampled float32 array at *destination_rate_hz* Hz.
    """
    return RTPCodec.resample(audio, source_rate_hz, destination_rate_hz)

send_rtp_audio(audio) async

Encode audio with the negotiated codec and transmit via RTP.

Looks up the caller's remote RTP address from the shared RealtimeTransportProtocol call registry and transmits encoded audio as 20 ms RTP packets, sleeping RTP_PACKET_DURATION_SECS between each packet.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM at codec.sample_rate_hz Hz.

required
Source code in voip/audio.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
async def send_rtp_audio(self, audio: np.ndarray) -> None:
    """Encode *audio* with the negotiated codec and transmit via RTP.

    Looks up the caller's remote RTP address from the shared
    [`RealtimeTransportProtocol`][voip.rtp.RealtimeTransportProtocol] call
    registry and transmits encoded audio as 20 ms RTP packets, sleeping
    `RTP_PACKET_DURATION_SECS` between each packet.

    Args:
        audio: Float32 mono PCM at `codec.sample_rate_hz` Hz.
    """
    remote_addr = next(
        (addr for addr, call in self.rtp.calls.items() if call is self),
        None,
    )
    if remote_addr is None:
        logger.warning("No remote RTP address for this call; dropping audio")
        return
    for payload in self.codec.packetize(audio):
        self.send_packet(self.next_rtp_packet(payload), remote_addr)
        await asyncio.sleep(self.RTP_PACKET_DURATION_SECS)

Voice Activity Detection

voip.audio.VoiceActivityCall dataclass

Bases: AudioCall

AudioCall with energy-based voice activity detection (VAD) and speech buffering.

Accumulates audio frames into speech_buffer based on the result of collect_audio. A debounce timer is armed on silence and fires flush_speech_buffer after silence_gap seconds of sustained quiet. Subclasses implement speech_buffer_ready to handle the buffered utterance.

Override collect_audio to change which frames are accumulated. The default implementation buffers only speech frames (RMS above speech_threshold). To buffer all frames (e.g. for transcription that needs the full utterance including silent pauses), override to always return True.

Attributes:

Name Type Description
speech_threshold float

RMS level below which audio is treated as silence.

silence_gap float

Seconds of sustained silence required to flush the buffer.

Source code in voip/audio.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
@dataclasses.dataclass(kw_only=True)
class VoiceActivityCall(AudioCall):
    """AudioCall with energy-based voice activity detection (VAD) and speech buffering.

    Accumulates audio frames into `speech_buffer` based on the result of
    [`collect_audio`][voip.audio.VoiceActivityCall.collect_audio].  A debounce
    timer is armed on silence and fires
    [`flush_speech_buffer`][voip.audio.VoiceActivityCall.flush_speech_buffer]
    after `silence_gap` seconds of sustained quiet.  Subclasses implement
    [`speech_buffer_ready`][voip.audio.VoiceActivityCall.speech_buffer_ready]
    to handle the buffered utterance.

    Override [`collect_audio`][voip.audio.VoiceActivityCall.collect_audio] to
    change which frames are accumulated.  The default implementation buffers
    only speech frames (RMS above `speech_threshold`).  To buffer all frames
    (e.g. for transcription that needs the full utterance including silent
    pauses), override to always return `True`.

    Attributes:
        speech_threshold: RMS level below which audio is treated as silence.
        silence_gap: Seconds of sustained silence required to flush the buffer.
    """

    speech_threshold: float = dataclasses.field(default=0.001)
    silence_gap: float = dataclasses.field(default=0.5)

    speech_buffer: list[np.ndarray] = dataclasses.field(
        init=False, repr=False, default_factory=list
    )
    silence_handle: asyncio.TimerHandle | None = dataclasses.field(
        init=False, repr=False, default=None
    )

    def audio_received(self, *, audio: np.ndarray, rms: float) -> None:
        if self.collect_audio(audio, rms):
            self.speech_buffer.append(audio)
        if rms > self.speech_threshold:
            self.on_audio_speech()
        else:
            self.on_audio_silence()

    def collect_audio(self, audio: np.ndarray, rms: float) -> bool:
        """Return whether to buffer this audio frame.

        The default implementation buffers speech frames only (RMS above
        `speech_threshold`).  Override to change the buffering strategy.

        Args:
            audio: Decoded float32 PCM frame.
            rms: Root mean square of *audio*.

        Returns:
            `True` when the frame should be appended to `speech_buffer`.
        """
        return rms > self.speech_threshold

    def on_audio_speech(self) -> None:
        """Cancel any pending silence timer when speech is detected."""
        if self.silence_handle is not None:
            self.silence_handle.cancel()
            self.silence_handle = None

    def on_audio_silence(self) -> None:
        """Arm the silence debounce timer when speech is buffered."""
        if self.silence_handle is None and self.speech_buffer:
            loop = asyncio.get_running_loop()
            self.silence_handle = loop.call_later(
                self.silence_gap,
                self.flush_speech_buffer,
            )

    def flush_speech_buffer(self) -> None:
        """Concatenate buffered audio and schedule [`speech_buffer_ready`][voip.audio.VoiceActivityCall.speech_buffer_ready].

        Resets speech state so the next utterance starts with a clean buffer.
        """
        self.silence_handle = None
        if not self.speech_buffer:
            return
        audio = np.concatenate(self.speech_buffer)
        self.speech_buffer.clear()
        asyncio.create_task(self.speech_buffer_ready(audio))

    async def speech_buffer_ready(self, audio: np.ndarray) -> None:
        """Handle the flushed speech buffer.  Override in subclasses.

        This base implementation is a no-op.  Subclasses must override this
        method to process the buffered utterance (e.g. echo it back, transcribe
        it, etc.).

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz
                containing the full buffered utterance.
        """

collect_audio(audio, rms)

Return whether to buffer this audio frame.

The default implementation buffers speech frames only (RMS above speech_threshold). Override to change the buffering strategy.

Parameters:

Name Type Description Default
audio ndarray

Decoded float32 PCM frame.

required
rms float

Root mean square of audio.

required

Returns:

Type Description
bool

True when the frame should be appended to speech_buffer.

Source code in voip/audio.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def collect_audio(self, audio: np.ndarray, rms: float) -> bool:
    """Return whether to buffer this audio frame.

    The default implementation buffers speech frames only (RMS above
    `speech_threshold`).  Override to change the buffering strategy.

    Args:
        audio: Decoded float32 PCM frame.
        rms: Root mean square of *audio*.

    Returns:
        `True` when the frame should be appended to `speech_buffer`.
    """
    return rms > self.speech_threshold

flush_speech_buffer()

Concatenate buffered audio and schedule speech_buffer_ready.

Resets speech state so the next utterance starts with a clean buffer.

Source code in voip/audio.py
353
354
355
356
357
358
359
360
361
362
363
def flush_speech_buffer(self) -> None:
    """Concatenate buffered audio and schedule [`speech_buffer_ready`][voip.audio.VoiceActivityCall.speech_buffer_ready].

    Resets speech state so the next utterance starts with a clean buffer.
    """
    self.silence_handle = None
    if not self.speech_buffer:
        return
    audio = np.concatenate(self.speech_buffer)
    self.speech_buffer.clear()
    asyncio.create_task(self.speech_buffer_ready(audio))

on_audio_silence()

Arm the silence debounce timer when speech is buffered.

Source code in voip/audio.py
344
345
346
347
348
349
350
351
def on_audio_silence(self) -> None:
    """Arm the silence debounce timer when speech is buffered."""
    if self.silence_handle is None and self.speech_buffer:
        loop = asyncio.get_running_loop()
        self.silence_handle = loop.call_later(
            self.silence_gap,
            self.flush_speech_buffer,
        )

on_audio_speech()

Cancel any pending silence timer when speech is detected.

Source code in voip/audio.py
338
339
340
341
342
def on_audio_speech(self) -> None:
    """Cancel any pending silence timer when speech is detected."""
    if self.silence_handle is not None:
        self.silence_handle.cancel()
        self.silence_handle = None

speech_buffer_ready(audio) async

Handle the flushed speech buffer. Override in subclasses.

This base implementation is a no-op. Subclasses must override this method to process the buffered utterance (e.g. echo it back, transcribe it, etc.).

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz containing the full buffered utterance.

required
Source code in voip/audio.py
365
366
367
368
369
370
371
372
373
374
375
async def speech_buffer_ready(self, audio: np.ndarray) -> None:
    """Handle the flushed speech buffer.  Override in subclasses.

    This base implementation is a no-op.  Subclasses must override this
    method to process the buffered utterance (e.g. echo it back, transcribe
    it, etc.).

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz
            containing the full buffered utterance.
    """

Echo Call

voip.audio.EchoCall dataclass

Bases: VoiceActivityCall

RTP call handler that echoes the caller's speech back after they finish speaking.

Accumulates speech audio frames (RMS above speech_threshold) via the VoiceActivityCall VAD machinery and replays them once a sustained silence lasting silence_gap seconds is detected. This gives the caller a natural echo of their own voice, useful for network latency testing and call-flow demonstrations.

Example
class MySession(SessionInitiationProtocol):
    def call_received(self, request: Request) -> None:
        self.answer(request=request, call_class=EchoCall)
Source code in voip/audio.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
@dataclasses.dataclass(kw_only=True)
class EchoCall(VoiceActivityCall):
    """RTP call handler that echoes the caller's speech back after they finish speaking.

    Accumulates speech audio frames (RMS above `speech_threshold`) via the
    [`VoiceActivityCall`][voip.audio.VoiceActivityCall] VAD machinery and
    replays them once a sustained silence lasting `silence_gap` seconds is
    detected.  This gives the caller a natural echo of their own voice,
    useful for network latency testing and call-flow demonstrations.

    Example:
        ```python
        class MySession(SessionInitiationProtocol):
            def call_received(self, request: Request) -> None:
                self.answer(request=request, call_class=EchoCall)
        ```
    """

    async def speech_buffer_ready(self, audio: np.ndarray) -> None:
        """Resample and transmit buffered speech audio back to the caller.

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
        """
        resampled = self.resample(
            audio, self.RESAMPLING_RATE_HZ, self.codec.sample_rate_hz
        )
        await self.send_rtp_audio(resampled)

speech_buffer_ready(audio) async

Resample and transmit buffered speech audio back to the caller.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

required
Source code in voip/audio.py
396
397
398
399
400
401
402
403
404
405
async def speech_buffer_ready(self, audio: np.ndarray) -> None:
    """Resample and transmit buffered speech audio back to the caller.

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
    """
    resampled = self.resample(
        audio, self.RESAMPLING_RATE_HZ, self.codec.sample_rate_hz
    )
    await self.send_rtp_audio(resampled)

AI / Agentic Calls

voip.ai.TranscribeCall dataclass

Bases: VoiceActivityCall

RTP call handler that transcribes audio with faster-whisper.

Audio is decoded by AudioCall on a per-packet basis and delivered to audio_received, which applies an energy-based voice activity detector (VAD) from VoiceActivityCall. All audio frames (speech and silence) are accumulated until silence is sustained for silence_gap seconds, then the entire utterance is sent to Whisper as one chunk. This avoids cutting sentences in the middle and prevents background microphone noise from being passed to Whisper as spurious audio.

Override transcription_received to handle the resulting text:

class MySession(SessionInitiationProtocol):
    def call_received(self, request: Request) -> None:
        self.answer(request=request, call_class=MyCall)

To share one model instance across multiple calls (recommended to avoid loading it multiple times) pass a pre-loaded WhisperModel:

shared_model = WhisperModel("base")

class MyCall(TranscribeCall):
    model = shared_model
Source code in voip/ai.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@dataclasses.dataclass(kw_only=True)
class TranscribeCall(VoiceActivityCall):
    """RTP call handler that transcribes audio with faster-whisper.

    Audio is decoded by [`AudioCall`][voip.audio.AudioCall] on a per-packet
    basis and delivered to [`audio_received`][voip.audio.AudioCall.audio_received],
    which applies an energy-based voice activity detector (VAD) from
    [`VoiceActivityCall`][voip.audio.VoiceActivityCall].  All audio frames
    (speech and silence) are accumulated until silence is sustained for
    `silence_gap` seconds, then the entire utterance is sent to Whisper as
    one chunk.  This avoids cutting sentences in the middle and prevents
    background microphone noise from being passed to Whisper as spurious audio.

    Override [`transcription_received`][voip.ai.TranscribeCall.transcription_received]
    to handle the resulting text:

    ```python
    class MySession(SessionInitiationProtocol):
        def call_received(self, request: Request) -> None:
            self.answer(request=request, call_class=MyCall)
    ```

    To share one model instance across multiple calls (recommended to avoid
    loading it multiple times) pass a pre-loaded `WhisperModel`:

    ```python
    shared_model = WhisperModel("base")

    class MyCall(TranscribeCall):
        model = shared_model
    ```

    """

    model: str | WhisperModel = dataclasses.field(default="kyutai/stt-1b-en_fr-trfs")
    whisper_model: WhisperModel = dataclasses.field(init=False, repr=False)

    def __post_init__(self) -> None:
        super().__post_init__()
        if isinstance(self.model, str):
            logger.debug("Loading Whisper model %r", self.model)
            self.whisper_model = WhisperModel(self.model)
        else:
            self.whisper_model = self.model

    def collect_audio(self, audio: np.ndarray, rms: float) -> bool:
        """Buffer all audio frames (speech and silence) for transcription.

        Args:
            audio: Decoded float32 PCM frame.
            rms: Root mean square of *audio*.

        Returns:
            Always `True` so that intra-utterance silences are preserved.
        """
        return True

    async def speech_buffer_ready(self, audio: np.ndarray) -> None:
        """Transcribe the buffered utterance when it meets the minimum length.

        Skips utterances shorter than one second to avoid passing fragments
        to Whisper that would produce low-quality transcriptions.

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
        """
        if len(audio) < self.RESAMPLING_RATE_HZ:
            return
        await self.transcribe(audio)

    async def transcribe(self, audio: np.ndarray) -> None:
        """Transcribe decoded audio and deliver non-empty text to the handler.

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
        """
        loop = asyncio.get_running_loop()
        raw = await loop.run_in_executor(None, self.run_transcription, audio)
        if text := raw.strip():
            self.transcription_received(text)

    def run_transcription(self, audio: np.ndarray) -> str:
        """Transcribe a float32 PCM array using the Whisper model.

        Args:
            audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.

        Returns:
            Concatenated transcription text from all segments.
        """
        segments, _ = self.whisper_model.transcribe(audio)
        result = "".join(segment.text for segment in segments)
        logger.debug("Transcription result: %r", segments)
        return result

    def transcription_received(self, text: str) -> None:
        """Handle a transcription result.  Override in subclasses.

        Args:
            text: Transcribed text for this audio chunk (already stripped).
        """

collect_audio(audio, rms)

Buffer all audio frames (speech and silence) for transcription.

Parameters:

Name Type Description Default
audio ndarray

Decoded float32 PCM frame.

required
rms float

Root mean square of audio.

required

Returns:

Type Description
bool

Always True so that intra-utterance silences are preserved.

Source code in voip/ai.py
74
75
76
77
78
79
80
81
82
83
84
def collect_audio(self, audio: np.ndarray, rms: float) -> bool:
    """Buffer all audio frames (speech and silence) for transcription.

    Args:
        audio: Decoded float32 PCM frame.
        rms: Root mean square of *audio*.

    Returns:
        Always `True` so that intra-utterance silences are preserved.
    """
    return True

run_transcription(audio)

Transcribe a float32 PCM array using the Whisper model.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

required

Returns:

Type Description
str

Concatenated transcription text from all segments.

Source code in voip/ai.py
110
111
112
113
114
115
116
117
118
119
120
121
122
def run_transcription(self, audio: np.ndarray) -> str:
    """Transcribe a float32 PCM array using the Whisper model.

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.

    Returns:
        Concatenated transcription text from all segments.
    """
    segments, _ = self.whisper_model.transcribe(audio)
    result = "".join(segment.text for segment in segments)
    logger.debug("Transcription result: %r", segments)
    return result

speech_buffer_ready(audio) async

Transcribe the buffered utterance when it meets the minimum length.

Skips utterances shorter than one second to avoid passing fragments to Whisper that would produce low-quality transcriptions.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

required
Source code in voip/ai.py
86
87
88
89
90
91
92
93
94
95
96
97
async def speech_buffer_ready(self, audio: np.ndarray) -> None:
    """Transcribe the buffered utterance when it meets the minimum length.

    Skips utterances shorter than one second to avoid passing fragments
    to Whisper that would produce low-quality transcriptions.

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
    """
    if len(audio) < self.RESAMPLING_RATE_HZ:
        return
    await self.transcribe(audio)

transcribe(audio) async

Transcribe decoded audio and deliver non-empty text to the handler.

Parameters:

Name Type Description Default
audio ndarray

Float32 mono PCM array at RESAMPLING_RATE_HZ Hz.

required
Source code in voip/ai.py
 99
100
101
102
103
104
105
106
107
108
async def transcribe(self, audio: np.ndarray) -> None:
    """Transcribe decoded audio and deliver non-empty text to the handler.

    Args:
        audio: Float32 mono PCM array at `RESAMPLING_RATE_HZ` Hz.
    """
    loop = asyncio.get_running_loop()
    raw = await loop.run_in_executor(None, self.run_transcription, audio)
    if text := raw.strip():
        self.transcription_received(text)

transcription_received(text)

Handle a transcription result. Override in subclasses.

Parameters:

Name Type Description Default
text str

Transcribed text for this audio chunk (already stripped).

required
Source code in voip/ai.py
124
125
126
127
128
129
def transcription_received(self, text: str) -> None:
    """Handle a transcription result.  Override in subclasses.

    Args:
        text: Transcribed text for this audio chunk (already stripped).
    """

voip.ai.AgentCall dataclass

Bases: TranscribeCall

RTP call handler that responds to caller speech using Ollama and Pocket TTS.

Extends TranscribeCall by feeding each transcription to an Ollama language model, then synthesising the reply as speech with Pocket TTS and streaming it back to the caller via RTP.

Chat history is maintained across turns so the language model can follow the conversation. A built-in system prompt informs the model that it is on a phone call.

To share the TTS model across multiple calls pass a pre-loaded TTSModel:

shared_tts = TTSModel.load_model()
AgentCall(rtp=..., sip=..., tts_model=shared_tts)
Source code in voip/ai.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@dataclasses.dataclass(kw_only=True)
class AgentCall(TranscribeCall):
    """RTP call handler that responds to caller speech using Ollama and Pocket TTS.

    Extends [`TranscribeCall`][voip.ai.TranscribeCall] by feeding each
    transcription to an Ollama language model, then synthesising the reply as
    speech with Pocket TTS and streaming it back to the caller via RTP.

    Chat history is maintained across turns so the language model can follow
    the conversation.  A built-in system prompt informs the model that it is
    on a phone call.

    To share the TTS model across multiple calls pass a pre-loaded
    `TTSModel`:

    ```python
    shared_tts = TTSModel.load_model()
    AgentCall(rtp=..., sip=..., tts_model=shared_tts)
    ```
    """

    system_prompt: str = (
        "You are a person on a phone call. "
        "Keep your answers very brief and conversational."
    )

    #: Ollama model name for generating replies.
    ollama_model: str = dataclasses.field(default="llama3")
    #: Pocket TTS voice name or path to a conditioning audio file.
    voice: str = dataclasses.field(default="azelma")
    #: Pre-loaded Pocket TTS model.  Pass a shared instance to avoid
    #: loading the model separately for each call.
    tts_model: TTSModel | None = dataclasses.field(default=None)

    tts_instance: TTSModel = dataclasses.field(init=False, repr=False)
    voice_state: Any = dataclasses.field(init=False, repr=False)
    messages: list[dict] = dataclasses.field(init=False, repr=False)
    pending_text: list[str] = dataclasses.field(init=False, repr=False)
    response_task: asyncio.Task | None = dataclasses.field(init=False, repr=False)

    def __post_init__(self) -> None:
        super().__post_init__()
        self.tts_instance = self.tts_model or TTSModel.load_model()
        self.voice_state = self.tts_instance.get_state_for_audio_prompt(self.voice)  # type: ignore[arg-type]
        self.messages = [
            {
                "role": "system",
                "content": self.system_prompt
                + "\n\nYOU MUST NEVER USE NON-VERBAL CHARACTERS IN YOUR RESPONSES!",
            }
        ]
        self.pending_text = []
        self.response_task = None

    def transcription_received(self, text: str) -> None:
        match text:
            case "":
                return
            case _:
                self.pending_text.append(text)
                if self.response_task is not None and not self.response_task.done():
                    self.response_task.cancel()
                self.response_task = asyncio.create_task(self.respond())

    async def respond(self) -> None:
        """Fetch an Ollama reply for pending text and stream it as speech via RTP.

        On cancellation (human started speaking) the partial user turn is
        removed from the chat history so the history stays consistent.
        """
        self.messages.append({"role": "user", "content": "\n".join(self.pending_text)})
        self.pending_text.clear()
        try:
            response = await ollama.AsyncClient().chat(
                model=self.ollama_model,
                messages=self.messages,
            )
            reply = (response.message.content or "").encode("ascii", "ignore").decode()
            self.messages.append({"role": "assistant", "content": reply})
            logger.info("Agent reply: %r", reply)
            await self.send_speech(reply)
        except asyncio.CancelledError:
            # Remove the partial user turn so history stays consistent.
            if self.messages and self.messages[-1]["role"] == "user":
                self.messages.pop()
            raise
        except Exception:
            logger.exception("Error while generating agent response")

    async def send_speech(self, text: str) -> None:
        """Stream synthesised speech from Pocket TTS and send via RTP.

        Yields audio chunks from
        `TTSModel.generate_audio_stream` as soon as they are decoded,
        enabling low-latency real-time delivery to the caller.

        Args:
            text: Text to synthesise and transmit.
        """
        loop = asyncio.get_running_loop()
        queue: asyncio.Queue[np.ndarray | None] = asyncio.Queue()

        def generate() -> None:
            for chunk in self.tts_instance.generate_audio_stream(
                self.voice_state,
                text,  # type: ignore[too-many-positional-arguments]
            ):
                asyncio.run_coroutine_threadsafe(
                    queue.put(chunk.numpy()), loop
                ).result()
            asyncio.run_coroutine_threadsafe(queue.put(None), loop).result()

        future = loop.run_in_executor(None, generate)
        while (tts_chunk := await queue.get()) is not None:
            resampled = self.resample(
                tts_chunk, self.tts_instance.sample_rate, self.codec.sample_rate_hz
            )
            await self.send_rtp_audio(resampled)
        await future

respond() async

Fetch an Ollama reply for pending text and stream it as speech via RTP.

On cancellation (human started speaking) the partial user turn is removed from the chat history so the history stays consistent.

Source code in voip/ai.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def respond(self) -> None:
    """Fetch an Ollama reply for pending text and stream it as speech via RTP.

    On cancellation (human started speaking) the partial user turn is
    removed from the chat history so the history stays consistent.
    """
    self.messages.append({"role": "user", "content": "\n".join(self.pending_text)})
    self.pending_text.clear()
    try:
        response = await ollama.AsyncClient().chat(
            model=self.ollama_model,
            messages=self.messages,
        )
        reply = (response.message.content or "").encode("ascii", "ignore").decode()
        self.messages.append({"role": "assistant", "content": reply})
        logger.info("Agent reply: %r", reply)
        await self.send_speech(reply)
    except asyncio.CancelledError:
        # Remove the partial user turn so history stays consistent.
        if self.messages and self.messages[-1]["role"] == "user":
            self.messages.pop()
        raise
    except Exception:
        logger.exception("Error while generating agent response")

send_speech(text) async

Stream synthesised speech from Pocket TTS and send via RTP.

Yields audio chunks from TTSModel.generate_audio_stream as soon as they are decoded, enabling low-latency real-time delivery to the caller.

Parameters:

Name Type Description Default
text str

Text to synthesise and transmit.

required
Source code in voip/ai.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
async def send_speech(self, text: str) -> None:
    """Stream synthesised speech from Pocket TTS and send via RTP.

    Yields audio chunks from
    `TTSModel.generate_audio_stream` as soon as they are decoded,
    enabling low-latency real-time delivery to the caller.

    Args:
        text: Text to synthesise and transmit.
    """
    loop = asyncio.get_running_loop()
    queue: asyncio.Queue[np.ndarray | None] = asyncio.Queue()

    def generate() -> None:
        for chunk in self.tts_instance.generate_audio_stream(
            self.voice_state,
            text,  # type: ignore[too-many-positional-arguments]
        ):
            asyncio.run_coroutine_threadsafe(
                queue.put(chunk.numpy()), loop
            ).result()
        asyncio.run_coroutine_threadsafe(queue.put(None), loop).result()

    future = loop.run_in_executor(None, generate)
    while (tts_chunk := await queue.get()) is not None:
        resampled = self.resample(
            tts_chunk, self.tts_instance.sample_rate, self.codec.sample_rate_hz
        )
        await self.send_rtp_audio(resampled)
    await future