Build Voice AI Agents with Pipecat
This guide shows you how to plug SFVoPI calls into a Pipecat pipeline so you can build voice AI bots that speak, listen, and respond in real time over a phone call.
You get a copy-paste-ready Python project that:
- Answers incoming SFVoPI calls with an answer-webhook.
- Tells SFVoPI where to stream audio (your WebSocket URL).
- Runs a Pipecat pipeline — SFVoPI ↔ STT ↔ LLM ↔ TTS ↔ SFVoPI — against the caller.
- Handles barge-in (caller interrupts the bot) automatically.
How It Fits
Pipecat splits wire transport from wire protocol. You reuse Pipecat's built-in FastAPIWebsocketTransport and plug in one custom class — SFVoPIFrameSerializer — that translates between SFVoPI WebSocket JSON and Pipecat Frame objects.
That's the entire integration surface. One file.
Prerequisites
- Python 3.10+
- An SFVoPI app with
answer_urlandfallback_answer_urlconfigured — see Create App. - A public HTTPS URL for your server (use ngrok for local dev).
- API keys for your STT / LLM / TTS providers. This guide uses Deepgram + OpenAI + Cartesia, but you can swap any Pipecat-supported service.
Codec + Sample Rate
SFVoPI runs A-law (PCMA) @ 8 kHz end-to-end. The Indian telephony stack is narrowband — every frame that reaches the caller is 8 kHz A-law regardless of what you ask for. Pick one of these two configs:
| Use case | codec | sample_rate | What SFVoPI does | Pipeline audio |
|---|---|---|---|---|
| Default — recommended | PCMA | 8000 | Zero transcode. Bytes pass straight through. | μ-law decode in your serializer only |
| Linear PCM in pipeline | L16 | 8000 | One transcode per direction (A-law ↔ PCM) | Raw 8 kHz PCM, no μ-law math |
The trunk is 8 kHz A-law. Asking SFVoPI for 16 kHz just makes it upsample 8 kHz audio before handing to you and downsample your 16 kHz TTS before handing to the caller. Zero quality gain, pure CPU waste.
If your Pipecat pipeline wants 16 kHz PCM internally, do the resample on your own infra — the serializer below already handles this: alaw_to_pcm(raw, 8000, 16000, resampler) on ingest, and resamples back to 8 kHz on egress. SFVoPI stays a clean 8 kHz passthrough; your app controls its own internal rate.
The SFVoPI telephony leg is A-law. Picking PCMU forces a pointless μ-law ↔ A-law roundtrip on every frame. Zero quality gain, double CPU cost.
The Roundtrip Rule
Whatever codec + sample_rate you put in your answer-webhook response, use the exact same values in every playAudio frame. Mismatch = garbled audio + extra transcoding.
answer-webhook: { "codec": "PCMA", "sample_rate": 8000 }
every playAudio: { "contentType": "audio/PCMA", "sampleRate": 8000 }
PCMA vs L16 — Which?
PCMA 8000— fewest moving parts, zero SFVoPI transcode. Serializer doespcm_to_alaw/alaw_to_pcm. Pick this unless you have a reason not to.L16 8000— SFVoPI transcodes A-law ↔ PCM once per direction, but you get raw PCM in the WebSocket. Serializer is justbase64.b64encode(pcm). Easier to debug, marginally more CPU on SFVoPI.
Quick Rule
Default for India telephony → PCMA 8000. Ignore L16 unless you specifically want linear PCM in the wire format.
Install
pip install "pipecat-ai[deepgram,openai,cartesia,silero,webrtc]" \
fastapi uvicorn python-dotenv numpy audioop-lts
audioop-lts provides μ-law / A-law codec helpers on Python 3.13+ (where audioop was removed). On Python 3.10–3.12, audioop is part of the stdlib — the package is harmless.
Create a .env file:
DEEPGRAM_API_KEY=...
OPENAI_API_KEY=...
CARTESIA_API_KEY=...
Project Structure
my-sfvopi-bot/
├── .env
├── main.py # FastAPI app: answer webhook + WS handler
└── sfvopi_serializer.py # The one custom file you need
Step 1 — The SFVoPI Frame Serializer
This class is the only SFVoPI-specific code in the project. It maps:
| SFVoPI event | Pipecat frame |
|---|---|
Incoming start | Captured for streamId / callId (handled by the starter helper below) |
Incoming media | InputAudioRawFrame (PCM, resampled to pipeline rate) |
Incoming dtmf | InputDTMFFrame |
Incoming clearedAudio | InputTransportMessageFrame (for ack handling) |
Incoming playedStream | InputTransportMessageFrame (for ack handling) |
Outgoing playAudio | From OutputAudioRawFrame (PCM → μ-law/A-law → base64) |
Outgoing clearAudio | From InterruptionFrame (VAD barge-in) |
Outgoing checkpoint | From OutputTransportMessageFrame |
Save as sfvopi_serializer.py:
# sfvopi_serializer.py
from __future__ import annotations
import base64
import json
from typing import Literal, Optional
from pydantic import BaseModel
from pipecat.audio.utils import (
alaw_to_pcm,
create_stream_resampler,
pcm_to_alaw,
pcm_to_ulaw,
ulaw_to_pcm,
)
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InputTransportMessageFrame,
InterruptionFrame,
KeypadEntry,
OutputAudioRawFrame,
OutputTransportMessageFrame,
StartFrame,
TransportMessageFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
class SFVoPIFrameSerializer(FrameSerializer):
"""Serializer that speaks the SFVoPI WebSocket protocol."""
class InputParams(BaseModel):
# Must match the codec / sample_rate you return from the answer webhook.
# See "Choose Your Codec" section above.
# - PCMA 8000 → default, zero SFVoPI transcoding.
# - L16 8000 → raw PCM in WebSocket, SFVoPI transcodes A-law ↔ PCM.
# Don't request rates > 8000 — trunk is 8 kHz, anything else is upsampled waste.
codec: Literal["PCMA", "L16"] = "PCMA"
sfvopi_sample_rate: Literal[8000] = 8000
def __init__(
self,
stream_id: str,
call_id: str,
params: Optional[InputParams] = None,
):
self._stream_id = stream_id
self._call_id = call_id
self._params = params or SFVoPIFrameSerializer.InputParams()
self._pipeline_sample_rate: int = 16000 # overwritten by setup()
self._sfvopi_sample_rate: int = self._params.sfvopi_sample_rate
self._clear_seq = 0
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
@property
def type(self) -> FrameSerializerType:
return FrameSerializerType.TEXT
async def setup(self, frame: StartFrame) -> None:
# StartFrame carries the pipeline's sample rate (usually 16 kHz).
self._pipeline_sample_rate = frame.audio_in_sample_rate
# ---------- Pipecat Frame → SFVoPI JSON ----------
async def serialize(self, frame: Frame) -> Optional[str]:
if isinstance(frame, (EndFrame, CancelFrame)):
# Let the WS close naturally; SFVoPI will fire the hangup webhook.
return None
if isinstance(frame, InterruptionFrame):
self._clear_seq += 1
return json.dumps({
"event": "clearAudio",
"sequenceNumber": self._clear_seq,
})
if isinstance(frame, OutputAudioRawFrame):
pcm = frame.audio
if self._params.codec == "PCMU":
encoded = await pcm_to_ulaw(
pcm,
frame.sample_rate,
self._sfvopi_sample_rate,
self._output_resampler,
)
content_type = "audio/PCMU"
elif self._params.codec == "PCMA":
encoded = await pcm_to_alaw(
pcm,
frame.sample_rate,
self._sfvopi_sample_rate,
self._output_resampler,
)
content_type = "audio/PCMA"
else: # L16 — raw 16-bit signed little-endian PCM, no codec conversion
if frame.sample_rate != self._sfvopi_sample_rate:
# Only resample if pipeline rate differs from SFVoPI rate.
from pipecat.audio.utils import resample_audio
encoded = await resample_audio(
pcm,
frame.sample_rate,
self._sfvopi_sample_rate,
self._output_resampler,
)
else:
encoded = pcm # zero-copy
content_type = "audio/x-l16"
if not encoded:
return None
return json.dumps({
"event": "playAudio",
"media": {
"payload": base64.b64encode(encoded).decode("utf-8"),
"contentType": content_type,
"sampleRate": self._sfvopi_sample_rate,
},
})
if isinstance(frame, (OutputTransportMessageFrame, TransportMessageFrame)):
# Pass through any app-level JSON message (e.g. checkpoint).
payload = frame.message
if isinstance(payload, dict):
return json.dumps(payload)
return str(payload)
return None
# ---------- SFVoPI JSON → Pipecat Frame ----------
async def deserialize(self, data: str | bytes) -> Optional[Frame]:
try:
msg = json.loads(data)
except (ValueError, TypeError):
return None
event = msg.get("event")
if event == "media":
media = msg.get("media", {})
payload_b64 = media.get("payload")
if not payload_b64:
return None
raw = base64.b64decode(payload_b64)
content_type = media.get("contentType", "audio/PCMU")
in_rate = media.get("sampleRate", self._sfvopi_sample_rate)
if content_type == "audio/PCMA":
pcm = await alaw_to_pcm(raw, in_rate, self._pipeline_sample_rate, self._input_resampler)
elif content_type == "audio/x-l16":
# Already raw 16-bit signed LE PCM; resample only if needed.
if in_rate != self._pipeline_sample_rate:
from pipecat.audio.utils import resample_audio
pcm = await resample_audio(raw, in_rate, self._pipeline_sample_rate, self._input_resampler)
else:
pcm = raw
else: # default: treat as audio/PCMU (μ-law)
pcm = await ulaw_to_pcm(raw, in_rate, self._pipeline_sample_rate, self._input_resampler)
return InputAudioRawFrame(
audio=pcm,
num_channels=1,
sample_rate=self._pipeline_sample_rate,
)
if event == "dtmf":
digit = msg.get("digit")
try:
return InputDTMFFrame(KeypadEntry(digit))
except ValueError:
return None
if event in ("clearedAudio", "playedStream"):
# Surface acks to the pipeline so app processors can react.
return InputTransportMessageFrame(message=msg)
# "start" is consumed before the pipeline begins (see main.py).
return None
Step 2 — Read the start Event
SFVoPI sends a start event right after the WebSocket opens, carrying streamId and callId. Pipecat needs those values before building the serializer, so we read that first message manually:
# in main.py
import json
from fastapi import WebSocket
async def read_sfvopi_start(websocket: WebSocket) -> dict:
"""Block until the SFVoPI 'start' event arrives. Return its payload."""
while True:
raw = await websocket.receive_text()
try:
msg = json.loads(raw)
except ValueError:
continue
if msg.get("event") == "start":
return msg
Step 3 — FastAPI Server
Save as main.py. This runs everything: the answer webhook, the WebSocket handler, and the Pipecat pipeline.
# main.py
import json
import os
from dotenv import load_dotenv
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import JSONResponse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from sfvopi_serializer import SFVoPIFrameSerializer
load_dotenv()
PUBLIC_WSS_URL = os.environ.get("PUBLIC_WSS_URL", "wss://your-server.example.com/ws")
# --- Codec + sample rate ---
# SFVoPI / Indian telephony stack runs A-law @ 8 kHz end-to-end.
# "PCMA", 8000 → default, zero SFVoPI transcoding.
# "L16", 8000 → raw PCM in WebSocket, SFVoPI transcodes A-law ↔ PCM.
# Don't request rates > 8000 — trunk is 8 kHz, anything else is wasted CPU.
# Don't use "PCMU" — pointless A-law↔μ-law roundtrip.
SFVOPI_CODEC = "PCMA"
SFVOPI_SAMPLE_RATE = 8000
app = FastAPI()
# ---------- Answer Webhook ----------
@app.post("/webhook/answer")
async def answer_webhook(request: Request):
payload = await request.json()
print(f"[answer] call_uuid={payload.get('call_uuid')} from={payload.get('from')}")
return JSONResponse({
"stream": {
"url": PUBLIC_WSS_URL,
"codec": SFVOPI_CODEC,
"sample_rate": SFVOPI_SAMPLE_RATE,
"direction": "BOTH",
"extra_headers": {
"X-Call-UUID": payload.get("call_uuid", ""),
},
}
})
# ---------- Hangup Webhook (optional) ----------
@app.post("/webhook/hangup")
async def hangup_webhook(request: Request):
payload = await request.json()
print(f"[hangup] call_uuid={payload.get('call_uuid')} status={payload.get('call_status')}")
return {"ok": True}
# ---------- WebSocket: the Pipecat pipeline ----------
@app.websocket("/ws")
async def sfvopi_ws(websocket: WebSocket):
await websocket.accept()
# 1. Read SFVoPI start event
start = await _read_start(websocket)
stream_id = start["streamId"]
call_id = start["callId"]
print(f"[ws] stream started streamId={stream_id} callId={call_id}")
# 2. Build the serializer
serializer = SFVoPIFrameSerializer(
stream_id=stream_id,
call_id=call_id,
params=SFVoPIFrameSerializer.InputParams(
codec=SFVOPI_CODEC,
sfvopi_sample_rate=SFVOPI_SAMPLE_RATE,
),
)
# 3. Build the transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(), # enables barge-in
serializer=serializer,
),
)
# 4. Build the pipeline
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4o-mini",
)
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",
)
context = OpenAILLMContext([
{
"role": "system",
"content": (
"You are a friendly phone receptionist. "
"Keep replies under 2 sentences. "
"Start by greeting the caller and asking how you can help."
),
},
])
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
allow_interruptions=True,
),
)
@transport.event_handler("on_client_connected")
async def _greet(transport, client):
# Nudge the LLM to produce the opening line immediately.
await task.queue_frames([context_aggregator.user().get_context_frame()])
await PipelineRunner(handle_sigint=False).run(task)
async def _read_start(websocket: WebSocket) -> dict:
while True:
raw = await websocket.receive_text()
try:
msg = json.loads(raw)
except ValueError:
continue
if msg.get("event") == "start":
return msg
Step 4 — Run It
Local dev with ngrok:
# terminal 1
uvicorn main:app --host 0.0.0.0 --port 3000
# terminal 2
ngrok http 3000
Take the https://<id>.ngrok-free.app URL ngrok prints and set two things:
- In your
.env:PUBLIC_WSS_URL=wss://<id>.ngrok-free.app/ws - In your SFVoPI app (Update App):
answer_url→https://<id>.ngrok-free.app/webhook/answerhangup_url→https://<id>.ngrok-free.app/webhook/hangup
Now call any number linked to your SFVoPI app. You should hear the bot greet you.
How Barge-In Works
Pipecat's SileroVADAnalyzer watches the caller's audio. The moment the caller starts speaking while the bot is talking, the pipeline emits an InterruptionFrame. The serializer converts it to:
{ "event": "clearAudio", "sequenceNumber": 1 }
SFVoPI drops the queued bot audio instantly and sends back:
{ "event": "clearedAudio", "sequenceNumber": 1 }
No extra code in your app. It just works.
Handling DTMF (Keypad Presses)
InputDTMFFrames flow through the pipeline. Wire a tiny processor ahead of the LLM to act on them:
from pipecat.frames.frames import InputDTMFFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class DTMFRouter(FrameProcessor):
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, InputDTMFFrame):
digit = frame.button.value
print(f"[dtmf] caller pressed {digit}")
if digit == "0":
# e.g. transfer to a human, switch assistant, ...
pass
return # swallow the DTMF frame
await self.push_frame(frame, direction)
Insert it right after transport.input() in the pipeline.
Sending a Checkpoint
To know when a specific piece of audio has finished playing (e.g., the opening greeting), push an OutputTransportMessageFrame:
from pipecat.frames.frames import OutputTransportMessageFrame
await task.queue_frames([
OutputTransportMessageFrame(message={
"event": "checkpoint",
"streamId": stream_id,
"name": "greeting_complete",
}),
])
When the audio before the checkpoint finishes, SFVoPI sends playedStream with name: "greeting_complete". The serializer surfaces it as an InputTransportMessageFrame — write a small FrameProcessor to watch for it.
Swapping Services
Any Pipecat-compatible STT / LLM / TTS works. For example, ElevenLabs TTS instead of Cartesia:
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
tts = ElevenLabsTTSService(
api_key=os.environ["ELEVENLABS_API_KEY"],
voice_id="...",
)
No other changes.
Production Checklist
- Run
uvicornbehind a reverse proxy (nginx, Caddy) with a real TLS cert — SFVoPI requireswss://. - Set
stream_timeoutin your answer-webhook response to match your expected call length. - Use
extra_headersto passX-Call-UUIDso your WS handler can correlate calls in logs. - Run multiple workers (
uvicorn --workers N) — each WebSocket pins to one process, so scale horizontally. - Cache the
SileroVADAnalyzermodel on first boot if cold-start latency matters. - Handle the
hangupwebhook to stop any per-call billing / resources. - Keep LLM replies short. Phone users hate monologues.
Troubleshooting
"Call connects but bot is silent."
Check PUBLIC_WSS_URL uses wss:// (not https://) and points at /ws. Look for the [ws] stream started log line.
"Bot audio sounds like chipmunks or robot." Codec or sample-rate mismatch. Three things must agree:
- The
codec+sample_rateyou return from the answer-webhook. SFVOPI_CODEC+SFVOPI_SAMPLE_RATEinmain.py.- Every
playAudioframe you emit must use the samecontentType+sampleRate.
The serializer handles resampling from the Pipecat pipeline rate (16 kHz) down to SFVoPI's 8 kHz automatically — keep audio_in_sample_rate=16000 and audio_out_sample_rate=16000 as shown. See "Codec + Sample Rate" above.
"Audio is garbled / staticky in one direction only."
You're probably sending the wrong codec on playAudio (e.g. SFVoPI asked for PCMA but you sent PCMU). The bytes decode as legal audio but interpret wrong — classic symptom. Recheck SFVOPI_CODEC.
"I asked for 16 kHz but STT accuracy didn't improve."
It can't. Trunk is 8 kHz — no real 16 kHz source exists. Keep sample_rate: 8000 in the answer-webhook. If your Pipecat pipeline needs 16 kHz internally, resample on your side (the serializer already does this in alaw_to_pcm / pcm_to_alaw). Your infra, your CPU — not SFVoPI's.
"Bot talks over the caller."
Make sure vad_analyzer=SileroVADAnalyzer() is set on FastAPIWebsocketParams and allow_interruptions=True is set on PipelineParams.
"Webhook times out."
SFVoPI gives the answer webhook 10 seconds. Don't do slow work in the handler — just return the Stream JSON. Heavy setup belongs in the /ws handler.
"DTMF never fires." The caller must press keys after the WebSocket is connected. DTMF tones played before the stream starts are delivered as RFC-2833 events to the carrier and don't reach your app.
Related
- Audio Streaming Overview
- WebSocket Protocol — full event reference
- Answer Webhook — Stream JSON schema
- Pipecat docs — framework reference