Skip to main content

Build Voice AI Agents with Pipecat

This guide shows you how to plug SFVoPI calls into a Pipecat pipeline so you can build voice AI bots that speak, listen, and respond in real time over a phone call.

You get a copy-paste-ready Python project that:

  1. Answers incoming SFVoPI calls with an answer-webhook.
  2. Tells SFVoPI where to stream audio (your WebSocket URL).
  3. Runs a Pipecat pipeline — SFVoPI ↔ STT ↔ LLM ↔ TTS ↔ SFVoPI — against the caller.
  4. Handles barge-in (caller interrupts the bot) automatically.

How It Fits

Pipecat splits wire transport from wire protocol. You reuse Pipecat's built-in FastAPIWebsocketTransport and plug in one custom class — SFVoPIFrameSerializer — that translates between SFVoPI WebSocket JSON and Pipecat Frame objects.

That's the entire integration surface. One file.


Prerequisites

  • Python 3.10+
  • An SFVoPI app with answer_url and fallback_answer_url configured — see Create App.
  • A public HTTPS URL for your server (use ngrok for local dev).
  • API keys for your STT / LLM / TTS providers. This guide uses Deepgram + OpenAI + Cartesia, but you can swap any Pipecat-supported service.

Codec + Sample Rate

SFVoPI runs A-law (PCMA) @ 8 kHz end-to-end. The Indian telephony stack is narrowband — every frame that reaches the caller is 8 kHz A-law regardless of what you ask for. Pick one of these two configs:

Use casecodecsample_rateWhat SFVoPI doesPipeline audio
Default — recommendedPCMA8000Zero transcode. Bytes pass straight through.μ-law decode in your serializer only
Linear PCM in pipelineL168000One transcode per direction (A-law ↔ PCM)Raw 8 kHz PCM, no μ-law math
Do NOT request sample rates > 8000

The trunk is 8 kHz A-law. Asking SFVoPI for 16 kHz just makes it upsample 8 kHz audio before handing to you and downsample your 16 kHz TTS before handing to the caller. Zero quality gain, pure CPU waste.

If your Pipecat pipeline wants 16 kHz PCM internally, do the resample on your own infra — the serializer below already handles this: alaw_to_pcm(raw, 8000, 16000, resampler) on ingest, and resamples back to 8 kHz on egress. SFVoPI stays a clean 8 kHz passthrough; your app controls its own internal rate.

Never use PCMU on SFVoPI

The SFVoPI telephony leg is A-law. Picking PCMU forces a pointless μ-law ↔ A-law roundtrip on every frame. Zero quality gain, double CPU cost.

The Roundtrip Rule

Whatever codec + sample_rate you put in your answer-webhook response, use the exact same values in every playAudio frame. Mismatch = garbled audio + extra transcoding.

answer-webhook:  { "codec": "PCMA", "sample_rate": 8000 }
every playAudio: { "contentType": "audio/PCMA", "sampleRate": 8000 }

PCMA vs L16 — Which?

  • PCMA 8000 — fewest moving parts, zero SFVoPI transcode. Serializer does pcm_to_alaw / alaw_to_pcm. Pick this unless you have a reason not to.
  • L16 8000 — SFVoPI transcodes A-law ↔ PCM once per direction, but you get raw PCM in the WebSocket. Serializer is just base64.b64encode(pcm). Easier to debug, marginally more CPU on SFVoPI.

Quick Rule

Default for India telephony → PCMA 8000. Ignore L16 unless you specifically want linear PCM in the wire format.


Install

pip install "pipecat-ai[deepgram,openai,cartesia,silero,webrtc]" \
fastapi uvicorn python-dotenv numpy audioop-lts

audioop-lts provides μ-law / A-law codec helpers on Python 3.13+ (where audioop was removed). On Python 3.10–3.12, audioop is part of the stdlib — the package is harmless.

Create a .env file:

DEEPGRAM_API_KEY=...
OPENAI_API_KEY=...
CARTESIA_API_KEY=...

Project Structure

my-sfvopi-bot/
├── .env
├── main.py # FastAPI app: answer webhook + WS handler
└── sfvopi_serializer.py # The one custom file you need

Step 1 — The SFVoPI Frame Serializer

This class is the only SFVoPI-specific code in the project. It maps:

SFVoPI eventPipecat frame
Incoming startCaptured for streamId / callId (handled by the starter helper below)
Incoming mediaInputAudioRawFrame (PCM, resampled to pipeline rate)
Incoming dtmfInputDTMFFrame
Incoming clearedAudioInputTransportMessageFrame (for ack handling)
Incoming playedStreamInputTransportMessageFrame (for ack handling)
Outgoing playAudioFrom OutputAudioRawFrame (PCM → μ-law/A-law → base64)
Outgoing clearAudioFrom InterruptionFrame (VAD barge-in)
Outgoing checkpointFrom OutputTransportMessageFrame

Save as sfvopi_serializer.py:

# sfvopi_serializer.py
from __future__ import annotations

import base64
import json
from typing import Literal, Optional

from pydantic import BaseModel

from pipecat.audio.utils import (
alaw_to_pcm,
create_stream_resampler,
pcm_to_alaw,
pcm_to_ulaw,
ulaw_to_pcm,
)
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InputTransportMessageFrame,
InterruptionFrame,
KeypadEntry,
OutputAudioRawFrame,
OutputTransportMessageFrame,
StartFrame,
TransportMessageFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType


class SFVoPIFrameSerializer(FrameSerializer):
"""Serializer that speaks the SFVoPI WebSocket protocol."""

class InputParams(BaseModel):
# Must match the codec / sample_rate you return from the answer webhook.
# See "Choose Your Codec" section above.
# - PCMA 8000 → default, zero SFVoPI transcoding.
# - L16 8000 → raw PCM in WebSocket, SFVoPI transcodes A-law ↔ PCM.
# Don't request rates > 8000 — trunk is 8 kHz, anything else is upsampled waste.
codec: Literal["PCMA", "L16"] = "PCMA"
sfvopi_sample_rate: Literal[8000] = 8000

def __init__(
self,
stream_id: str,
call_id: str,
params: Optional[InputParams] = None,
):
self._stream_id = stream_id
self._call_id = call_id
self._params = params or SFVoPIFrameSerializer.InputParams()

self._pipeline_sample_rate: int = 16000 # overwritten by setup()
self._sfvopi_sample_rate: int = self._params.sfvopi_sample_rate
self._clear_seq = 0

self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()

@property
def type(self) -> FrameSerializerType:
return FrameSerializerType.TEXT

async def setup(self, frame: StartFrame) -> None:
# StartFrame carries the pipeline's sample rate (usually 16 kHz).
self._pipeline_sample_rate = frame.audio_in_sample_rate

# ---------- Pipecat Frame → SFVoPI JSON ----------
async def serialize(self, frame: Frame) -> Optional[str]:
if isinstance(frame, (EndFrame, CancelFrame)):
# Let the WS close naturally; SFVoPI will fire the hangup webhook.
return None

if isinstance(frame, InterruptionFrame):
self._clear_seq += 1
return json.dumps({
"event": "clearAudio",
"sequenceNumber": self._clear_seq,
})

if isinstance(frame, OutputAudioRawFrame):
pcm = frame.audio
if self._params.codec == "PCMU":
encoded = await pcm_to_ulaw(
pcm,
frame.sample_rate,
self._sfvopi_sample_rate,
self._output_resampler,
)
content_type = "audio/PCMU"
elif self._params.codec == "PCMA":
encoded = await pcm_to_alaw(
pcm,
frame.sample_rate,
self._sfvopi_sample_rate,
self._output_resampler,
)
content_type = "audio/PCMA"
else: # L16 — raw 16-bit signed little-endian PCM, no codec conversion
if frame.sample_rate != self._sfvopi_sample_rate:
# Only resample if pipeline rate differs from SFVoPI rate.
from pipecat.audio.utils import resample_audio
encoded = await resample_audio(
pcm,
frame.sample_rate,
self._sfvopi_sample_rate,
self._output_resampler,
)
else:
encoded = pcm # zero-copy
content_type = "audio/x-l16"

if not encoded:
return None

return json.dumps({
"event": "playAudio",
"media": {
"payload": base64.b64encode(encoded).decode("utf-8"),
"contentType": content_type,
"sampleRate": self._sfvopi_sample_rate,
},
})

if isinstance(frame, (OutputTransportMessageFrame, TransportMessageFrame)):
# Pass through any app-level JSON message (e.g. checkpoint).
payload = frame.message
if isinstance(payload, dict):
return json.dumps(payload)
return str(payload)

return None

# ---------- SFVoPI JSON → Pipecat Frame ----------
async def deserialize(self, data: str | bytes) -> Optional[Frame]:
try:
msg = json.loads(data)
except (ValueError, TypeError):
return None

event = msg.get("event")

if event == "media":
media = msg.get("media", {})
payload_b64 = media.get("payload")
if not payload_b64:
return None

raw = base64.b64decode(payload_b64)
content_type = media.get("contentType", "audio/PCMU")
in_rate = media.get("sampleRate", self._sfvopi_sample_rate)

if content_type == "audio/PCMA":
pcm = await alaw_to_pcm(raw, in_rate, self._pipeline_sample_rate, self._input_resampler)
elif content_type == "audio/x-l16":
# Already raw 16-bit signed LE PCM; resample only if needed.
if in_rate != self._pipeline_sample_rate:
from pipecat.audio.utils import resample_audio
pcm = await resample_audio(raw, in_rate, self._pipeline_sample_rate, self._input_resampler)
else:
pcm = raw
else: # default: treat as audio/PCMU (μ-law)
pcm = await ulaw_to_pcm(raw, in_rate, self._pipeline_sample_rate, self._input_resampler)

return InputAudioRawFrame(
audio=pcm,
num_channels=1,
sample_rate=self._pipeline_sample_rate,
)

if event == "dtmf":
digit = msg.get("digit")
try:
return InputDTMFFrame(KeypadEntry(digit))
except ValueError:
return None

if event in ("clearedAudio", "playedStream"):
# Surface acks to the pipeline so app processors can react.
return InputTransportMessageFrame(message=msg)

# "start" is consumed before the pipeline begins (see main.py).
return None

Step 2 — Read the start Event

SFVoPI sends a start event right after the WebSocket opens, carrying streamId and callId. Pipecat needs those values before building the serializer, so we read that first message manually:

# in main.py
import json
from fastapi import WebSocket


async def read_sfvopi_start(websocket: WebSocket) -> dict:
"""Block until the SFVoPI 'start' event arrives. Return its payload."""
while True:
raw = await websocket.receive_text()
try:
msg = json.loads(raw)
except ValueError:
continue
if msg.get("event") == "start":
return msg

Step 3 — FastAPI Server

Save as main.py. This runs everything: the answer webhook, the WebSocket handler, and the Pipecat pipeline.

# main.py
import json
import os

from dotenv import load_dotenv
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import JSONResponse

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)

from sfvopi_serializer import SFVoPIFrameSerializer

load_dotenv()

PUBLIC_WSS_URL = os.environ.get("PUBLIC_WSS_URL", "wss://your-server.example.com/ws")

# --- Codec + sample rate ---
# SFVoPI / Indian telephony stack runs A-law @ 8 kHz end-to-end.
# "PCMA", 8000 → default, zero SFVoPI transcoding.
# "L16", 8000 → raw PCM in WebSocket, SFVoPI transcodes A-law ↔ PCM.
# Don't request rates > 8000 — trunk is 8 kHz, anything else is wasted CPU.
# Don't use "PCMU" — pointless A-law↔μ-law roundtrip.
SFVOPI_CODEC = "PCMA"
SFVOPI_SAMPLE_RATE = 8000

app = FastAPI()


# ---------- Answer Webhook ----------
@app.post("/webhook/answer")
async def answer_webhook(request: Request):
payload = await request.json()
print(f"[answer] call_uuid={payload.get('call_uuid')} from={payload.get('from')}")

return JSONResponse({
"stream": {
"url": PUBLIC_WSS_URL,
"codec": SFVOPI_CODEC,
"sample_rate": SFVOPI_SAMPLE_RATE,
"direction": "BOTH",
"extra_headers": {
"X-Call-UUID": payload.get("call_uuid", ""),
},
}
})


# ---------- Hangup Webhook (optional) ----------
@app.post("/webhook/hangup")
async def hangup_webhook(request: Request):
payload = await request.json()
print(f"[hangup] call_uuid={payload.get('call_uuid')} status={payload.get('call_status')}")
return {"ok": True}


# ---------- WebSocket: the Pipecat pipeline ----------
@app.websocket("/ws")
async def sfvopi_ws(websocket: WebSocket):
await websocket.accept()

# 1. Read SFVoPI start event
start = await _read_start(websocket)
stream_id = start["streamId"]
call_id = start["callId"]
print(f"[ws] stream started streamId={stream_id} callId={call_id}")

# 2. Build the serializer
serializer = SFVoPIFrameSerializer(
stream_id=stream_id,
call_id=call_id,
params=SFVoPIFrameSerializer.InputParams(
codec=SFVOPI_CODEC,
sfvopi_sample_rate=SFVOPI_SAMPLE_RATE,
),
)

# 3. Build the transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(), # enables barge-in
serializer=serializer,
),
)

# 4. Build the pipeline
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4o-mini",
)
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",
)

context = OpenAILLMContext([
{
"role": "system",
"content": (
"You are a friendly phone receptionist. "
"Keep replies under 2 sentences. "
"Start by greeting the caller and asking how you can help."
),
},
])
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])

task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
allow_interruptions=True,
),
)

@transport.event_handler("on_client_connected")
async def _greet(transport, client):
# Nudge the LLM to produce the opening line immediately.
await task.queue_frames([context_aggregator.user().get_context_frame()])

await PipelineRunner(handle_sigint=False).run(task)


async def _read_start(websocket: WebSocket) -> dict:
while True:
raw = await websocket.receive_text()
try:
msg = json.loads(raw)
except ValueError:
continue
if msg.get("event") == "start":
return msg

Step 4 — Run It

Local dev with ngrok:

# terminal 1
uvicorn main:app --host 0.0.0.0 --port 3000

# terminal 2
ngrok http 3000

Take the https://<id>.ngrok-free.app URL ngrok prints and set two things:

  • In your .env: PUBLIC_WSS_URL=wss://<id>.ngrok-free.app/ws
  • In your SFVoPI app (Update App):
    • answer_urlhttps://<id>.ngrok-free.app/webhook/answer
    • hangup_urlhttps://<id>.ngrok-free.app/webhook/hangup

Now call any number linked to your SFVoPI app. You should hear the bot greet you.


How Barge-In Works

Pipecat's SileroVADAnalyzer watches the caller's audio. The moment the caller starts speaking while the bot is talking, the pipeline emits an InterruptionFrame. The serializer converts it to:

{ "event": "clearAudio", "sequenceNumber": 1 }

SFVoPI drops the queued bot audio instantly and sends back:

{ "event": "clearedAudio", "sequenceNumber": 1 }

No extra code in your app. It just works.


Handling DTMF (Keypad Presses)

InputDTMFFrames flow through the pipeline. Wire a tiny processor ahead of the LLM to act on them:

from pipecat.frames.frames import InputDTMFFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class DTMFRouter(FrameProcessor):
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)

if isinstance(frame, InputDTMFFrame):
digit = frame.button.value
print(f"[dtmf] caller pressed {digit}")
if digit == "0":
# e.g. transfer to a human, switch assistant, ...
pass
return # swallow the DTMF frame

await self.push_frame(frame, direction)

Insert it right after transport.input() in the pipeline.


Sending a Checkpoint

To know when a specific piece of audio has finished playing (e.g., the opening greeting), push an OutputTransportMessageFrame:

from pipecat.frames.frames import OutputTransportMessageFrame

await task.queue_frames([
OutputTransportMessageFrame(message={
"event": "checkpoint",
"streamId": stream_id,
"name": "greeting_complete",
}),
])

When the audio before the checkpoint finishes, SFVoPI sends playedStream with name: "greeting_complete". The serializer surfaces it as an InputTransportMessageFrame — write a small FrameProcessor to watch for it.


Swapping Services

Any Pipecat-compatible STT / LLM / TTS works. For example, ElevenLabs TTS instead of Cartesia:

from pipecat.services.elevenlabs.tts import ElevenLabsTTSService

tts = ElevenLabsTTSService(
api_key=os.environ["ELEVENLABS_API_KEY"],
voice_id="...",
)

No other changes.


Production Checklist

  • Run uvicorn behind a reverse proxy (nginx, Caddy) with a real TLS cert — SFVoPI requires wss://.
  • Set stream_timeout in your answer-webhook response to match your expected call length.
  • Use extra_headers to pass X-Call-UUID so your WS handler can correlate calls in logs.
  • Run multiple workers (uvicorn --workers N) — each WebSocket pins to one process, so scale horizontally.
  • Cache the SileroVADAnalyzer model on first boot if cold-start latency matters.
  • Handle the hangup webhook to stop any per-call billing / resources.
  • Keep LLM replies short. Phone users hate monologues.

Troubleshooting

"Call connects but bot is silent." Check PUBLIC_WSS_URL uses wss:// (not https://) and points at /ws. Look for the [ws] stream started log line.

"Bot audio sounds like chipmunks or robot." Codec or sample-rate mismatch. Three things must agree:

  1. The codec + sample_rate you return from the answer-webhook.
  2. SFVOPI_CODEC + SFVOPI_SAMPLE_RATE in main.py.
  3. Every playAudio frame you emit must use the same contentType + sampleRate.

The serializer handles resampling from the Pipecat pipeline rate (16 kHz) down to SFVoPI's 8 kHz automatically — keep audio_in_sample_rate=16000 and audio_out_sample_rate=16000 as shown. See "Codec + Sample Rate" above.

"Audio is garbled / staticky in one direction only." You're probably sending the wrong codec on playAudio (e.g. SFVoPI asked for PCMA but you sent PCMU). The bytes decode as legal audio but interpret wrong — classic symptom. Recheck SFVOPI_CODEC.

"I asked for 16 kHz but STT accuracy didn't improve." It can't. Trunk is 8 kHz — no real 16 kHz source exists. Keep sample_rate: 8000 in the answer-webhook. If your Pipecat pipeline needs 16 kHz internally, resample on your side (the serializer already does this in alaw_to_pcm / pcm_to_alaw). Your infra, your CPU — not SFVoPI's.

"Bot talks over the caller." Make sure vad_analyzer=SileroVADAnalyzer() is set on FastAPIWebsocketParams and allow_interruptions=True is set on PipelineParams.

"Webhook times out." SFVoPI gives the answer webhook 10 seconds. Don't do slow work in the handler — just return the Stream JSON. Heavy setup belongs in the /ws handler.

"DTMF never fires." The caller must press keys after the WebSocket is connected. DTMF tones played before the stream starts are delivered as RFC-2833 events to the carrier and don't reach your app.