
In this part we will use a real-time transcription service to transcribe the audio from the news broadcast. We use a client/server model to be able to scale this better in the future to multiple streams.
Why even transcribe the Speech?
Today pure text news are already sucked out by many algorithms. That means there is no alpha left for us.
We go more RAW and unstructured and just suck out all what we can get out of the news.
That will give our AI Agent much more data and context as than just parsing curated news feeds.
Architecture
Implementation
Client
Client Code Explanation:
The client code captures your system's audio output using the PyAudio library. It first lists available audio devices and prompts you to select a loopback device (a virtual input capturing system sound). Once a device is chosen, it initializes an audio stream and retrieves important configuration details such as the number of channels, sample rate, and sample width. This information is sent to a WebSocket server to ensure that the server can correctly process the incoming audio. The client then continuously reads small chunks of audio data, encodes these chunks in base64, and streams them over an open WebSocket connection. It also handles potential errors and connection interruptions by attempting to reconnect, ensuring robust real-time audio streaming.
Client Code:
#!/usr/bin/env python3
"""
Real-time Audio Streaming Client
This script captures system audio output and streams it to a WebSocket server.
"""
import asyncio
import websockets
import pyaudio
import base64
import json
import platform
import argparse
# Audio parameters
CHUNK = 1024 # Number of frames per buffer
FORMAT = pyaudio.paInt16 # Audio format (16-bit PCM)
CHANNELS = 2 # Stereo
RATE = 44100 # Sample rate (Hz)
DEFAULT_WEBSOCKET_URL = "ws://192.168.100.51:8765"
def show_setup_instructions():
"""Display platform-specific loopback setup instructions."""
os_name = platform.system()
print("\n=== Audio Loopback Setup Instructions ===")
if os_name == "Windows":
print("Windows Setup:")
print("1. Install VB-Cable (Virtual Audio Cable): https://vb-audio.com/Cable/")
print("2. In Windows Sound settings, set VB-Cable as your default playback device")
print("3. Select VB-Cable Output as your input device in this application")
elif os_name == "Darwin": # macOS
print("macOS Setup:")
print("1. Install Soundflower: https://github.com/mattingalls/Soundflower/releases")
print("2. Open Audio MIDI Setup and create a Multi-Output Device")
print("3. Set this Multi-Output Device as your default output")
print("4. Select Soundflower (2ch) as your input device in this application")
elif os_name == "Linux":
print("Linux Setup:")
print("1. Using PulseAudio: pactl load-module module-loopback latency_msec=1")
print("2. Using PipeWire: pw-loopback")
print("3. Select the appropriate monitor device in this application")
print("========================================\n")
async def stream_audio(websocket_url=DEFAULT_WEBSOCKET_URL):
"""Capture and stream audio to the WebSocket server."""
p = pyaudio.PyAudio()
# Show setup instructions
show_setup_instructions()
# Print available devices
print("Available audio devices:")
for i in range(p.get_device_count()):
dev_info = p.get_device_info_by_index(i)
print(f"Device {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']}, outputs: {dev_info['maxOutputChannels']})")
# Ask for loopback device index
device_index = int(input("Enter the loopback device index: "))
device_info = p.get_device_info_by_index(device_index)
device_channels = device_info['maxInputChannels']
if device_channels <= 0:
print("Selected device has no input channels. Please choose a device with input capability.")
p.terminate()
return
device_rate = int(device_info['defaultSampleRate'])
# Open stream
try:
stream = p.open(
format=FORMAT,
channels=device_channels,
rate=device_rate,
input=True,
frames_per_buffer=CHUNK,
input_device_index=device_index
)
except Exception as e:
print(f"Error opening stream: {e}")
print("Please check your device selection and setup.")
p.terminate()
return
print(f"Stream opened, connecting to WebSocket server at {websocket_url}...")
# Prepare format info to be sent to the server upon connection
format_info = {
"event": "format",
"data": {
"channels": device_channels,
"sample_rate": device_rate,
"sample_width": pyaudio.get_sample_size(FORMAT),
"format": "int16"
}
}
try:
while True:
try:
async with websockets.connect(websocket_url) as websocket:
print("Connected to WebSocket server, streaming audio...")
await websocket.send(json.dumps(format_info))
while True:
# Read audio data
audio_data = stream.read(CHUNK, exception_on_overflow=False)
# Convert to base64 for sending
encoded_data = base64.b64encode(audio_data).decode('utf-8')
# Create a message packet
message = {
"event": "audio",
"data": encoded_data
}
# Send to the server
await websocket.send(json.dumps(message))
except Exception as e:
print(f"WebSocket connection error: {e}")
print("Attempting to reconnect in 3 seconds...")
await asyncio.sleep(3)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
stream.stop_stream()
stream.close()
p.terminate()
print("Audio streaming ended")
def main():
"""Parse command line arguments and start the audio streaming."""
parser = argparse.ArgumentParser(description='Audio WebSocket Client')
parser.add_argument('--url', default=DEFAULT_WEBSOCKET_URL,
help=f'WebSocket server URL (default: {DEFAULT_WEBSOCKET_URL})')
args = parser.parse_args()
try:
asyncio.run(stream_audio(args.url))
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Server
Server Code Explanation:
The server code sets up a WebSocket server to receive and process audio streams from multiple clients. When a client connects, the server registers the client and listens for two types of events: a 'format' event containing audio configuration details (channels, sample rate, etc.) and an 'audio' event that sends base64-encoded audio chunks. Upon receiving audio data, the server optionally applies audio effects and stores the audio data for saving to a WAV file. It also handles real-time transcription by accumulating audio samples until a threshold is reached and then uses the Whisper model in a separate thread to transcribe the audio. If audio playback is enabled and supported, the server plays the audio using the pydub library. After transcription, the transcribed text is printed out. The server also cleans up client data upon disconnection, saving the complete audio recording to a file if required.
Server Code
#!/usr/bin/env python3
"""
Real-time Audio Streaming Server
This script receives audio streams from clients via WebSockets and can process,
play, or save the audio data.
"""
import asyncio
import websockets
import json
import base64
import wave
import os
import argparse
from datetime import datetime
import numpy as np
import io
import tempfile # for temporary WAV file handling for transcription
import whisper
# Try to import optional libraries for audio playback
try:
from pydub import AudioSegment
from pydub.playback import play
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
print("pydub not available. Audio playback will be disabled.")
# Server parameters
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8765
# Storage for audio data
connected_clients = set()
recording_data = {}
# Global variable to hold the Whisper model once loaded
whisper_model = whisper.load_model("large")
# Processing options (can be set via command line args)
SAVE_TO_FILE = True
PLAY_AUDIO = False
APPLY_EFFECTS = False
def apply_audio_effects(audio_bytes, format_info):
"""Apply audio effects to the audio data."""
# Convert audio bytes to numpy array
audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
# Reshape for stereo if needed
if format_info["channels"] == 2:
audio_array = audio_array.reshape(-1, 2)
# Apply simple effect (e.g., gain)
audio_array = audio_array * 1.5 # Increase volume by 50%
# Clip to avoid distortion
audio_array = np.clip(audio_array, -32768, 32767).astype(np.int16)
# Convert back to bytes
return audio_array.tobytes()
async def handle_client(websocket, path=None):
"""Handle a WebSocket client connection."""
client_id = id(websocket)
connected_clients.add(websocket)
recording_data[client_id] = {
"format_info": None,
"audio_data": [],
"transcription_buffer": b"",
"transcription_in_progress": False
}
print(f"Client {client_id} connected")
try:
async for message in websocket:
data = json.loads(message)
event_type = data.get("event")
if event_type == "format":
# Store format information
recording_data[client_id]["format_info"] = data.get("data")
print(f"Received format info from client {client_id}: {data.get('data')}")
elif event_type == "audio":
#print('EVENT AUDIO')
# Decode audio data
audio_bytes = base64.b64decode(data.get("data"))
# Apply effects if enabled
if APPLY_EFFECTS and recording_data[client_id]["format_info"]:
audio_bytes = apply_audio_effects(audio_bytes, recording_data[client_id]["format_info"])
# Store audio data for potential saving
if SAVE_TO_FILE:
recording_data[client_id]["audio_data"].append(audio_bytes)
# Real-time transcription using OpenAI Whisper: accumulate audio for a longer segment
if whisper_model is not None and recording_data[client_id]["format_info"]:
format_info = recording_data[client_id]["format_info"]
# Calculate the number of bytes corresponding to 1.5 seconds of audio
bytes_per_second = format_info["sample_rate"] * format_info["sample_width"] * format_info["channels"]
threshold = int(1.5 * bytes_per_second)
# Accumulate the received audio_bytes into the transcription buffer
recording_data[client_id]["transcription_buffer"] += audio_bytes
# If the buffer has reached the threshold and no transcription is running, start transcription
if len(recording_data[client_id]["transcription_buffer"]) >= threshold and not recording_data[client_id]["transcription_in_progress"]:
buffer_to_transcribe = recording_data[client_id]["transcription_buffer"]
recording_data[client_id]["transcription_buffer"] = b""
#print(f"Triggering transcription for client {client_id} with buffer size {len(buffer_to_transcribe)}")
recording_data[client_id]["transcription_in_progress"] = True
asyncio.create_task(transcribe_audio(buffer_to_transcribe, format_info, client_id))
else:
print("whisper_model NONE")
# Play audio if enabled
if PLAY_AUDIO and PYDUB_AVAILABLE and recording_data[client_id]["format_info"]:
format_info = recording_data[client_id]["format_info"]
try:
# Create an in-memory WAV file
wav_file = io.BytesIO()
with wave.open(wav_file, 'wb') as wf:
wf.setnchannels(format_info["channels"])
wf.setsampwidth(format_info["sample_width"])
wf.setframerate(format_info["sample_rate"])
wf.writeframes(audio_bytes)
wav_file.seek(0)
# Play audio using pydub
audio_segment = AudioSegment.from_wav(wav_file)
play(audio_segment)
except Exception as e:
print(f"Error playing audio: {e}")
except Exception as e:
print(f"Error with client {client_id}: {e}")
finally:
connected_clients.remove(websocket)
# Save audio data to file on disconnect
if SAVE_TO_FILE and client_id in recording_data:
save_to_file(client_id)
if client_id in recording_data:
del recording_data[client_id]
print(f"Client {client_id} disconnected")
def save_to_file(client_id):
"""Save the recorded audio data to a WAV file."""
if not recording_data[client_id]["audio_data"]:
print(f"No audio data to save for client {client_id}")
return
format_info = recording_data[client_id]["format_info"]
if not format_info:
print(f"No format info available for client {client_id}")
return
# Create output directory if it doesn't exist
os.makedirs("recordings", exist_ok=True)
# Create a filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recordings/audio_recording_{client_id}_{timestamp}.wav"
# Save to WAV file
with wave.open(filename, 'wb') as wf:
wf.setnchannels(format_info["channels"])
wf.setsampwidth(format_info["sample_width"])
wf.setframerate(format_info["sample_rate"])
wf.writeframes(b''.join(recording_data[client_id]["audio_data"]))
#print(f"Saved recording to {filename}")
async def transcribe_audio(audio_buffer, format_info, client_id):
"""Run transcription in a separate thread and handle subsequent buffered audio."""
#print(f"Starting transcription for client {client_id}, buffer length: {len(audio_buffer)}")
# Check if the audio is effectively silent; if so, return an empty transcription
audio_np = np.frombuffer(audio_buffer, dtype=np.int16)
if np.max(np.abs(audio_np)) < 500:
#print(f"Audio for client {client_id} appears silent. Returning empty transcription.")
recording_data[client_id]["transcription_in_progress"] = False
return ""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
temp_filename = tmp.name
with wave.open(temp_filename, 'wb') as wf:
wf.setnchannels(format_info["channels"])
wf.setsampwidth(format_info["sample_width"])
wf.setframerate(format_info["sample_rate"])
wf.writeframes(audio_buffer)
# Run transcription in a separate thread
try:
result = await asyncio.wait_for(asyncio.to_thread(whisper_model.transcribe, temp_filename), timeout=30)
except asyncio.TimeoutError:
print(f"Transcription timed out for client {client_id}")
os.unlink(temp_filename)
recording_data[client_id]["transcription_in_progress"] = False
return ""
transcription_text = result["text"]
print(f"Transcription (client {client_id}): {transcription_text}")
os.unlink(temp_filename) # manually delete the temporary file
# Reset the transcription_in_progress flag
recording_data[client_id]["transcription_in_progress"] = False
return transcription_text
async def main():
"""Parse command line arguments and start the WebSocket server."""
parser = argparse.ArgumentParser(description='Audio WebSocket Server')
parser.add_argument('--host', default=DEFAULT_HOST,
help=f'Host address (default: {DEFAULT_HOST})')
parser.add_argument('--port', type=int, default=DEFAULT_PORT,
help=f'Port number (default: {DEFAULT_PORT})')
parser.add_argument('--save', action='store_true',
help='Save audio to file')
parser.add_argument('--play', action='store_true',
help='Play audio in real-time')
parser.add_argument('--effects', action='store_true',
help='Apply audio effects')
args = parser.parse_args()
global SAVE_TO_FILE, PLAY_AUDIO, APPLY_EFFECTS
SAVE_TO_FILE = args.save
PLAY_AUDIO = args.play and PYDUB_AVAILABLE
APPLY_EFFECTS = args.effects
if args.play and not PYDUB_AVAILABLE:
print("Warning: pydub is not available. Audio playback is disabled.")
server = await websockets.serve(handle_client, args.host, args.port)
print(f"WebSocket server started at ws://{args.host}:{args.port}")
print(f"Save to file: {SAVE_TO_FILE}, Play audio: {PLAY_AUDIO}, Apply effects: {APPLY_EFFECTS}")
await server.wait_closed()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"Error: {e}")
In later articles we will explore also the opportunity use the transcription text let the agent learn more about entities, events, remember persons etc. So stay tuned and do not forget to subscribe.
DISCLAIMER: The provided code does not present a production-ready setup regarding security and stability.
All code presented in this tutorial is used at your own risk.
Consider always performing security audits before putting any code in production.
Furthermore, none of the parts of the tutorials or code content should be considered as financial advice.
Always consult a professional investment advisor before making any investment decisions.
At CorticalFlow expanding the cognitive ability of the user is our mission.
- Check out our Website CorticalFlow
- Follow us on Twitter @CorticalFlow
- Check out our GitHub CorticalFlow
- Check out our YouTube CorticalFlow