Audio Processing
Learn how to build audio processors that handle real-time audio streams from SFVoPI. This guide covers the fundamentals and includes a complete echo-with-delay example.
What is an Audio Processor?
An audio processor is a component that:
- Receives audio chunks from the
mediaevent - Processes the audio (transcribe, analyze, transform, record)
- Sends audio back using the
playAudiocommand (optional) - Manages state and resources (timers, buffers, connections)
Audio processors are the core of your SFVoPI application, implementing your business logic for voice AI, call recording, transcription, IVR, and more.
Audio Processor Pattern
All audio processors follow this pattern:
class AudioProcessor {
// 1. Initialize state
constructor(config) {
this.config = config;
this.state = {};
}
// 2. Handle incoming audio
onMedia(mediaEvent, sendFn) {
// Decode audio
const audioBuffer = Buffer.from(mediaEvent.media.payload, 'base64');
// Process audio (your logic here)
this.processAudio(audioBuffer);
// Optionally send audio back
sendFn(playAudioCommand);
}
// 3. Clean up resources
destroy() {
// Clear timers, close connections, etc.
}
}
Echo Processor Example
The simplest audio processor is an echo: it receives audio from the caller and plays it back after a delay. This demonstrates the core concepts without external dependencies.
Architecture
Caller speaks → media event → EchoProcessor → setTimeout(delay) → playAudio command → Caller hears echo
Implementation
- TypeScript
- JavaScript
- Python
import type { MediaEvent, PlayAudioCommand } from './types';
export class EchoProcessor {
private timers: NodeJS.Timeout[] = [];
private delayMs: number;
constructor(delayMs: number = 500) {
this.delayMs = delayMs;
}
onMedia(mediaEvent: MediaEvent, sendFn: (command: PlayAudioCommand) => void): void {
// Schedule audio playback after delay
const timer = setTimeout(() => {
const playCommand: PlayAudioCommand = {
event: 'playAudio',
media: {
payload: mediaEvent.media.payload,
contentType: mediaEvent.media.contentType,
sampleRate: mediaEvent.media.sampleRate,
},
};
sendFn(playCommand);
}, this.delayMs);
// Track timer for cleanup
this.timers.push(timer);
}
destroy(): void {
// Clear all pending timers
this.timers.forEach(timer => clearTimeout(timer));
this.timers = [];
}
}
class EchoProcessor {
constructor(delayMs = 500) {
this.delayMs = delayMs;
this.timers = [];
}
onMedia(mediaEvent, sendFn) {
// Schedule audio playback after delay
const timer = setTimeout(() => {
const playCommand = {
event: 'playAudio',
media: {
payload: mediaEvent.media.payload,
contentType: mediaEvent.media.contentType,
sampleRate: mediaEvent.media.sampleRate,
},
};
sendFn(playCommand);
}, this.delayMs);
// Track timer for cleanup
this.timers.push(timer);
}
destroy() {
// Clear all pending timers
this.timers.forEach(timer => clearTimeout(timer));
this.timers = [];
}
}
module.exports = { EchoProcessor };
import threading
import json
import base64
class EchoProcessor:
def __init__(self, delay_ms=500):
self.delay_ms = delay_ms / 1000.0 # Convert to seconds
self.timers = []
def on_media(self, media_event, send_fn):
# Schedule audio playback after delay
def send_echo():
play_command = {
'event': 'playAudio',
'media': {
'payload': media_event['media']['payload'],
'contentType': media_event['media']['contentType'],
'sampleRate': media_event['media']['sampleRate'],
},
}
send_fn(play_command)
timer = threading.Timer(self.delay_ms, send_echo)
timer.start()
# Track timer for cleanup
self.timers.append(timer)
def destroy(self):
# Cancel all pending timers
for timer in self.timers:
timer.cancel()
self.timers = []
How It Works
- Receive Audio:
onMedia()is called for eachmediaevent - Schedule Echo:
setTimeout()schedules playback afterdelayMs - Send Audio: After delay,
sendFn()sendsplayAudiocommand with the same audio - Track Timers: Store timer references for cleanup
- Cleanup:
destroy()clears all pending timers when call ends
Key Concepts
- No Audio Decoding: Echo doesn't need to decode audio—it just passes through the base64 payload
- Timer Management: Track all timers to prevent memory leaks
- Cleanup: Always implement
destroy()to clean up resources - Send Function: Use the provided
sendFncallback to send commands
Usage in WebSocket Handler
- TypeScript
- JavaScript
import { WebSocketServer, WebSocket } from 'ws';
import { EchoProcessor } from './echo';
import type { IncomingEvent, PlayAudioCommand } from './types';
const wss = new WebSocketServer({ port: 3000 });
wss.on('connection', (ws: WebSocket) => {
const echo = new EchoProcessor(500); // 500ms delay
const sendAudio = (command: PlayAudioCommand): void => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(command));
}
};
ws.on('message', (data: Buffer) => {
const event: IncomingEvent = JSON.parse(data.toString());
switch (event.event) {
case 'start':
console.log(`Stream started: ${event.streamId}`);
break;
case 'media':
echo.onMedia(event, sendAudio);
break;
case 'dtmf':
console.log(`DTMF pressed: ${event.digit}`);
break;
}
});
ws.on('close', () => {
echo.destroy();
});
ws.on('error', (err) => {
console.error('WebSocket error:', err);
echo.destroy();
});
});
const { WebSocketServer } = require('ws');
const { EchoProcessor } = require('./echo');
const wss = new WebSocketServer({ port: 3000 });
wss.on('connection', (ws) => {
const echo = new EchoProcessor(500); // 500ms delay
const sendAudio = (command) => {
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify(command));
}
};
ws.on('message', (data) => {
const event = JSON.parse(data.toString());
switch (event.event) {
case 'start':
console.log(`Stream started: ${event.streamId}`);
break;
case 'media':
echo.onMedia(event, sendAudio);
break;
case 'dtmf':
console.log(`DTMF pressed: ${event.digit}`);
break;
}
});
ws.on('close', () => {
echo.destroy();
});
ws.on('error', (err) => {
console.error('WebSocket error:', err);
echo.destroy();
});
});
Testing the Echo
- Start your WebSocket server with the echo processor
- Create an SFVoPI app and link a phone number
- Make a call to the linked number
- Speak into the phone — you'll hear your voice echoed back after 500ms
Expected behavior:
- Caller says "Hello" → 500ms delay → Caller hears "Hello"
- Continuous echo as long as caller speaks
- Echo stops when caller stops speaking
Advanced Audio Processors
1. Call Recording Processor
Records all audio to a file or cloud storage.
- TypeScript
- JavaScript
import fs from 'fs';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
export class RecordingProcessor {
private audioChunks: Buffer[] = [];
private streamId: string;
private s3Client: S3Client;
constructor(streamId: string) {
this.streamId = streamId;
this.s3Client = new S3Client({ region: 'us-east-1' });
}
onMedia(mediaEvent: MediaEvent): void {
// Decode and store audio
const audioBuffer = Buffer.from(mediaEvent.media.payload, 'base64');
this.audioChunks.push(audioBuffer);
}
async destroy(): Promise<void> {
// Concatenate all chunks
const fullAudio = Buffer.concat(this.audioChunks);
// Upload to S3
const command = new PutObjectCommand({
Bucket: 'my-call-recordings',
Key: `recordings/${this.streamId}.pcm`,
Body: fullAudio,
ContentType: 'audio/pcm',
});
await this.s3Client.send(command);
console.log(`Recording saved: ${this.streamId}.pcm`);
}
}
const fs = require('fs');
const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3');
class RecordingProcessor {
constructor(streamId) {
this.streamId = streamId;
this.audioChunks = [];
this.s3Client = new S3Client({ region: 'us-east-1' });
}
onMedia(mediaEvent) {
// Decode and store audio
const audioBuffer = Buffer.from(mediaEvent.media.payload, 'base64');
this.audioChunks.push(audioBuffer);
}
async destroy() {
// Concatenate all chunks
const fullAudio = Buffer.concat(this.audioChunks);
// Upload to S3
const command = new PutObjectCommand({
Bucket: 'my-call-recordings',
Key: `recordings/${this.streamId}.pcm`,
Body: fullAudio,
ContentType: 'audio/pcm',
});
await this.s3Client.send(command);
console.log(`Recording saved: ${this.streamId}.pcm`);
}
}
module.exports = { RecordingProcessor };
Key Points:
- Store audio chunks in memory (or stream to disk for long calls)
- Upload to S3 (or your storage) when call ends
- Consider converting PCM to MP3/WAV for smaller file size
2. Transcription Processor
Transcribes audio in real-time using Deepgram or Whisper.
- TypeScript
- JavaScript
import { createClient, LiveTranscriptionEvents } from '@deepgram/sdk';
export class TranscriptionProcessor {
private deepgram: any;
private connection: any;
constructor() {
this.deepgram = createClient(process.env.DEEPGRAM_API_KEY);
// Open live transcription connection
this.connection = this.deepgram.listen.live({
model: 'nova-2',
language: 'en-US',
smart_format: true,
encoding: 'mulaw',
sample_rate: 8000,
});
this.connection.on(LiveTranscriptionEvents.Transcript, (data: any) => {
const transcript = data.channel.alternatives[0].transcript;
if (transcript) {
console.log(`Transcript: ${transcript}`);
// Store transcript, trigger actions, etc.
}
});
}
onMedia(mediaEvent: MediaEvent): void {
// Decode audio
const audioBuffer = Buffer.from(mediaEvent.media.payload, 'base64');
// Send to Deepgram
this.connection.send(audioBuffer);
}
destroy(): void {
this.connection.finish();
}
}
const { createClient, LiveTranscriptionEvents } = require('@deepgram/sdk');
class TranscriptionProcessor {
constructor() {
this.deepgram = createClient(process.env.DEEPGRAM_API_KEY);
// Open live transcription connection
this.connection = this.deepgram.listen.live({
model: 'nova-2',
language: 'en-US',
smart_format: true,
encoding: 'mulaw',
sample_rate: 8000,
});
this.connection.on(LiveTranscriptionEvents.Transcript, (data) => {
const transcript = data.channel.alternatives[0].transcript;
if (transcript) {
console.log(`Transcript: ${transcript}`);
// Store transcript, trigger actions, etc.
}
});
}
onMedia(mediaEvent) {
// Decode audio
const audioBuffer = Buffer.from(mediaEvent.media.payload, 'base64');
// Send to Deepgram
this.connection.send(audioBuffer);
}
destroy() {
this.connection.finish();
}
}
module.exports = { TranscriptionProcessor };
Key Points:
- Use streaming transcription APIs (Deepgram, AssemblyAI, Google Speech-to-Text)
- Match encoding and sample rate to your Stream JSON config
- Handle partial transcripts (interim results) vs final transcripts
3. Voice AI Processor
Combines transcription, LLM, and TTS for conversational AI.
export class VoiceAIProcessor {
private transcription: TranscriptionProcessor;
private llm: OpenAI;
private tts: ElevenLabs;
private conversationHistory: Message[] = [];
constructor() {
this.transcription = new TranscriptionProcessor();
this.llm = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
this.tts = new ElevenLabs({ apiKey: process.env.ELEVENLABS_API_KEY });
// Listen for transcripts
this.transcription.on('transcript', async (text: string) => {
await this.handleUserMessage(text);
});
}
async handleUserMessage(text: string): Promise<void> {
// Add to conversation history
this.conversationHistory.push({ role: 'user', content: text });
// Generate response with LLM
const response = await this.llm.chat.completions.create({
model: 'gpt-4',
messages: this.conversationHistory,
});
const assistantMessage = response.choices[0].message.content;
this.conversationHistory.push({ role: 'assistant', content: assistantMessage });
// Convert to speech
const audioStream = await this.tts.generate({
text: assistantMessage,
voice: 'rachel',
model_id: 'eleven_turbo_v2',
});
// Send audio to caller
for await (const chunk of audioStream) {
const playCommand = {
event: 'playAudio',
media: {
payload: chunk.toString('base64'),
contentType: 'audio/PCMU',
sampleRate: 8000,
},
};
this.sendFn(playCommand);
}
}
onMedia(mediaEvent: MediaEvent, sendFn: Function): void {
this.sendFn = sendFn;
this.transcription.onMedia(mediaEvent);
}
destroy(): void {
this.transcription.destroy();
}
}
Key Points:
- Chain multiple services: transcription → LLM → TTS
- Manage conversation state and history
- Handle interruptions (use
clearAudiowhen user interrupts) - Stream TTS audio in chunks for lower latency
Best Practices
1. Resource Management
Always clean up resources in destroy():
destroy(): void {
// Clear timers
this.timers.forEach(timer => clearTimeout(timer));
// Close connections
this.transcriptionClient?.close();
this.databaseConnection?.end();
// Clear buffers
this.audioChunks = [];
}
2. Error Handling
Wrap processing logic in try-catch:
onMedia(mediaEvent: MediaEvent, sendFn: Function): void {
try {
const audioBuffer = Buffer.from(mediaEvent.media.payload, 'base64');
this.processAudio(audioBuffer);
} catch (error) {
console.error('Error processing audio:', error);
// Don't crash—log and continue
}
}
3. Backpressure Handling
Don't overwhelm external services:
private processingQueue: MediaEvent[] = [];
private isProcessing = false;
async onMedia(mediaEvent: MediaEvent): Promise<void> {
this.processingQueue.push(mediaEvent);
if (!this.isProcessing) {
this.isProcessing = true;
await this.processQueue();
this.isProcessing = false;
}
}
private async processQueue(): Promise<void> {
while (this.processingQueue.length > 0) {
const event = this.processingQueue.shift()!;
await this.processAudioChunk(event);
}
}
4. State Management
Track state per stream:
interface StreamState {
streamId: string;
callId: string;
startTime: Date;
transcripts: string[];
audioChunks: Buffer[];
}
private streams = new Map<string, StreamState>();
onStart(startEvent: StartEvent): void {
this.streams.set(startEvent.streamId, {
streamId: startEvent.streamId,
callId: startEvent.callId,
startTime: new Date(),
transcripts: [],
audioChunks: [],
});
}
5. Logging and Monitoring
Log key events for debugging:
onMedia(mediaEvent: MediaEvent): void {
console.log(`[${mediaEvent.streamId}] Received ${audioBuffer.length} bytes`);
// Track metrics
this.metrics.audioChunksReceived++;
this.metrics.totalBytesReceived += audioBuffer.length;
}
Next Steps
- WebSocket Protocol — Learn all 8 event types
- Overview — Understand codecs and architecture
- Examples — See complete working examples
- Answer Webhook — Configure Stream JSON response