AI Case Study: Implementing a Real-time Financial News Agent - Part 4: Real-time Transcription

published on March 30, 2025

A picture of a microphone used for speech transcription

In this part we will use a real-time transcription service to transcribe the audio from the news broadcast. We use a client/server model to be able to scale this better in the future to multiple streams.

Why even transcribe the Speech?

Today pure text news are already sucked out by many algorithms. That means there is no alpha left for us.

We go more RAW and unstructured and just suck out all what we can get out of the news.

That will give our AI Agent much more data and context as than just parsing curated news feeds.

Architecture

Implementation

Client

Client Code Explanation:

The client code captures your system's audio output using the PyAudio library. It first lists available audio devices and prompts you to select a loopback device (a virtual input capturing system sound). Once a device is chosen, it initializes an audio stream and retrieves important configuration details such as the number of channels, sample rate, and sample width. This information is sent to a WebSocket server to ensure that the server can correctly process the incoming audio. The client then continuously reads small chunks of audio data, encodes these chunks in base64, and streams them over an open WebSocket connection. It also handles potential errors and connection interruptions by attempting to reconnect, ensuring robust real-time audio streaming.

Client Code:

#!/usr/bin/env python3
"""
Real-time Audio Streaming Client

This script captures system audio output and streams it to a WebSocket server.
"""

import asyncio
import websockets
import pyaudio
import base64
import json
import platform
import argparse

# Audio parameters
CHUNK = 1024  # Number of frames per buffer
FORMAT = pyaudio.paInt16  # Audio format (16-bit PCM)
CHANNELS = 2  # Stereo
RATE = 44100  # Sample rate (Hz)
DEFAULT_WEBSOCKET_URL = "ws://192.168.100.51:8765"


def show_setup_instructions():
    """Display platform-specific loopback setup instructions."""
    os_name = platform.system()
    print("\n=== Audio Loopback Setup Instructions ===")

    if os_name == "Windows":
        print("Windows Setup:")
        print("1. Install VB-Cable (Virtual Audio Cable): https://vb-audio.com/Cable/")
        print("2. In Windows Sound settings, set VB-Cable as your default playback device")
        print("3. Select VB-Cable Output as your input device in this application")

    elif os_name == "Darwin":  # macOS
        print("macOS Setup:")
        print("1. Install Soundflower: https://github.com/mattingalls/Soundflower/releases")
        print("2. Open Audio MIDI Setup and create a Multi-Output Device")
        print("3. Set this Multi-Output Device as your default output")
        print("4. Select Soundflower (2ch) as your input device in this application")

    elif os_name == "Linux":
        print("Linux Setup:")
        print("1. Using PulseAudio: pactl load-module module-loopback latency_msec=1")
        print("2. Using PipeWire: pw-loopback")
        print("3. Select the appropriate monitor device in this application")

    print("========================================\n")


async def stream_audio(websocket_url=DEFAULT_WEBSOCKET_URL):
    """Capture and stream audio to the WebSocket server."""
    p = pyaudio.PyAudio()

    # Show setup instructions
    show_setup_instructions()

    # Print available devices
    print("Available audio devices:")
    for i in range(p.get_device_count()):
        dev_info = p.get_device_info_by_index(i)
        print(f"Device {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']}, outputs: {dev_info['maxOutputChannels']})")

    # Ask for loopback device index
    device_index = int(input("Enter the loopback device index: "))
    device_info = p.get_device_info_by_index(device_index)
    device_channels = device_info['maxInputChannels']
    if device_channels <= 0:
        print("Selected device has no input channels. Please choose a device with input capability.")
        p.terminate()
        return
    device_rate = int(device_info['defaultSampleRate'])

    # Open stream
    try:
        stream = p.open(
            format=FORMAT,
            channels=device_channels,
            rate=device_rate,
            input=True,
            frames_per_buffer=CHUNK,
            input_device_index=device_index
        )
    except Exception as e:
        print(f"Error opening stream: {e}")
        print("Please check your device selection and setup.")
        p.terminate()
        return

    print(f"Stream opened, connecting to WebSocket server at {websocket_url}...")

    # Prepare format info to be sent to the server upon connection
    format_info = {
        "event": "format",
        "data": {
            "channels": device_channels,
            "sample_rate": device_rate,
            "sample_width": pyaudio.get_sample_size(FORMAT),
            "format": "int16"
        }
    }

    try:
        while True:
            try:
                async with websockets.connect(websocket_url) as websocket:
                    print("Connected to WebSocket server, streaming audio...")
                    await websocket.send(json.dumps(format_info))
                    while True:
                        # Read audio data
                        audio_data = stream.read(CHUNK, exception_on_overflow=False)

                        # Convert to base64 for sending
                        encoded_data = base64.b64encode(audio_data).decode('utf-8')

                        # Create a message packet
                        message = {
                            "event": "audio",
                            "data": encoded_data
                        }

                        # Send to the server
                        await websocket.send(json.dumps(message))
            except Exception as e:
                print(f"WebSocket connection error: {e}")
                print("Attempting to reconnect in 3 seconds...")
                await asyncio.sleep(3)
    except Exception as e:
        print(f"Error: {e}")

    finally:
        # Clean up
        stream.stop_stream()
        stream.close()
        p.terminate()
        print("Audio streaming ended")


def main():
    """Parse command line arguments and start the audio streaming."""
    parser = argparse.ArgumentParser(description='Audio WebSocket Client')
    parser.add_argument('--url', default=DEFAULT_WEBSOCKET_URL,
                        help=f'WebSocket server URL (default: {DEFAULT_WEBSOCKET_URL})')

    args = parser.parse_args()

    try:
        asyncio.run(stream_audio(args.url))
    except KeyboardInterrupt:
        print("\nExiting...")
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    main()

Server

Server Code Explanation:

The server code sets up a WebSocket server to receive and process audio streams from multiple clients. When a client connects, the server registers the client and listens for two types of events: a 'format' event containing audio configuration details (channels, sample rate, etc.) and an 'audio' event that sends base64-encoded audio chunks. Upon receiving audio data, the server optionally applies audio effects and stores the audio data for saving to a WAV file. It also handles real-time transcription by accumulating audio samples until a threshold is reached and then uses the Whisper model in a separate thread to transcribe the audio. If audio playback is enabled and supported, the server plays the audio using the pydub library. After transcription, the transcribed text is printed out. The server also cleans up client data upon disconnection, saving the complete audio recording to a file if required.

Server Code


#!/usr/bin/env python3
"""
Real-time Audio Streaming Server

This script receives audio streams from clients via WebSockets and can process,
play, or save the audio data.
"""

import asyncio
import websockets
import json
import base64
import wave
import os
import argparse
from datetime import datetime
import numpy as np
import io
import tempfile  # for temporary WAV file handling for transcription
import whisper
# Try to import optional libraries for audio playback
try:
    from pydub import AudioSegment
    from pydub.playback import play
    PYDUB_AVAILABLE = True
except ImportError:
    PYDUB_AVAILABLE = False
    print("pydub not available. Audio playback will be disabled.")

# Server parameters
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8765

# Storage for audio data
connected_clients = set()
recording_data = {}

# Global variable to hold the Whisper model once loaded
whisper_model = whisper.load_model("large")

# Processing options (can be set via command line args)
SAVE_TO_FILE = True
PLAY_AUDIO = False
APPLY_EFFECTS = False


def apply_audio_effects(audio_bytes, format_info):
    """Apply audio effects to the audio data."""
    # Convert audio bytes to numpy array
    audio_array = np.frombuffer(audio_bytes, dtype=np.int16)

    # Reshape for stereo if needed
    if format_info["channels"] == 2:
        audio_array = audio_array.reshape(-1, 2)

    # Apply simple effect (e.g., gain)
    audio_array = audio_array * 1.5  # Increase volume by 50%

    # Clip to avoid distortion
    audio_array = np.clip(audio_array, -32768, 32767).astype(np.int16)

    # Convert back to bytes
    return audio_array.tobytes()


async def handle_client(websocket, path=None):
    """Handle a WebSocket client connection."""
    client_id = id(websocket)
    connected_clients.add(websocket)

    recording_data[client_id] = {
        "format_info": None,
        "audio_data": [],
        "transcription_buffer": b"",
        "transcription_in_progress": False
    }

    print(f"Client {client_id} connected")

    try:
        async for message in websocket:
            data = json.loads(message)
            event_type = data.get("event")

            if event_type == "format":
                # Store format information
                recording_data[client_id]["format_info"] = data.get("data")
                print(f"Received format info from client {client_id}: {data.get('data')}")

            elif event_type == "audio":

                #print('EVENT AUDIO')

                # Decode audio data
                audio_bytes = base64.b64decode(data.get("data"))

                # Apply effects if enabled
                if APPLY_EFFECTS and recording_data[client_id]["format_info"]:
                    audio_bytes = apply_audio_effects(audio_bytes, recording_data[client_id]["format_info"])

                # Store audio data for potential saving
                if SAVE_TO_FILE:
                    recording_data[client_id]["audio_data"].append(audio_bytes)

                # Real-time transcription using OpenAI Whisper: accumulate audio for a longer segment
                if whisper_model is not None and recording_data[client_id]["format_info"]:
                    format_info = recording_data[client_id]["format_info"]
                    # Calculate the number of bytes corresponding to 1.5 seconds of audio
                    bytes_per_second = format_info["sample_rate"] * format_info["sample_width"] * format_info["channels"]
                    threshold = int(1.5 * bytes_per_second)
                    # Accumulate the received audio_bytes into the transcription buffer
                    recording_data[client_id]["transcription_buffer"] += audio_bytes
                    # If the buffer has reached the threshold and no transcription is running, start transcription
                    if len(recording_data[client_id]["transcription_buffer"]) >= threshold and not recording_data[client_id]["transcription_in_progress"]:
                        buffer_to_transcribe = recording_data[client_id]["transcription_buffer"]
                        recording_data[client_id]["transcription_buffer"] = b""
                        #print(f"Triggering transcription for client {client_id} with buffer size {len(buffer_to_transcribe)}")
                        recording_data[client_id]["transcription_in_progress"] = True
                        asyncio.create_task(transcribe_audio(buffer_to_transcribe, format_info, client_id))
                else:
                    print("whisper_model NONE")

                # Play audio if enabled
                if PLAY_AUDIO and PYDUB_AVAILABLE and recording_data[client_id]["format_info"]:
                    format_info = recording_data[client_id]["format_info"]
                    try:
                        # Create an in-memory WAV file
                        wav_file = io.BytesIO()
                        with wave.open(wav_file, 'wb') as wf:
                            wf.setnchannels(format_info["channels"])
                            wf.setsampwidth(format_info["sample_width"])
                            wf.setframerate(format_info["sample_rate"])
                            wf.writeframes(audio_bytes)
                        wav_file.seek(0)
                        # Play audio using pydub
                        audio_segment = AudioSegment.from_wav(wav_file)
                        play(audio_segment)
                    except Exception as e:
                        print(f"Error playing audio: {e}")

    except Exception as e:
        print(f"Error with client {client_id}: {e}")

    finally:
        connected_clients.remove(websocket)

        # Save audio data to file on disconnect
        if SAVE_TO_FILE and client_id in recording_data:
            save_to_file(client_id)

        if client_id in recording_data:
            del recording_data[client_id]

        print(f"Client {client_id} disconnected")


def save_to_file(client_id):
    """Save the recorded audio data to a WAV file."""
    if not recording_data[client_id]["audio_data"]:
        print(f"No audio data to save for client {client_id}")
        return

    format_info = recording_data[client_id]["format_info"]
    if not format_info:
        print(f"No format info available for client {client_id}")
        return

    # Create output directory if it doesn't exist
    os.makedirs("recordings", exist_ok=True)

    # Create a filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"recordings/audio_recording_{client_id}_{timestamp}.wav"

    # Save to WAV file
    with wave.open(filename, 'wb') as wf:
        wf.setnchannels(format_info["channels"])
        wf.setsampwidth(format_info["sample_width"])
        wf.setframerate(format_info["sample_rate"])
        wf.writeframes(b''.join(recording_data[client_id]["audio_data"]))

    #print(f"Saved recording to {filename}")


async def transcribe_audio(audio_buffer, format_info, client_id):
    """Run transcription in a separate thread and handle subsequent buffered audio."""
    #print(f"Starting transcription for client {client_id}, buffer length: {len(audio_buffer)}")
    # Check if the audio is effectively silent; if so, return an empty transcription
    audio_np = np.frombuffer(audio_buffer, dtype=np.int16)
    if np.max(np.abs(audio_np)) < 500:
        #print(f"Audio for client {client_id} appears silent. Returning empty transcription.")
        recording_data[client_id]["transcription_in_progress"] = False
        return ""
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
        temp_filename = tmp.name
    with wave.open(temp_filename, 'wb') as wf:
        wf.setnchannels(format_info["channels"])
        wf.setsampwidth(format_info["sample_width"])
        wf.setframerate(format_info["sample_rate"])
        wf.writeframes(audio_buffer)
    # Run transcription in a separate thread
    try:
        result = await asyncio.wait_for(asyncio.to_thread(whisper_model.transcribe, temp_filename), timeout=30)
    except asyncio.TimeoutError:
        print(f"Transcription timed out for client {client_id}")
        os.unlink(temp_filename)
        recording_data[client_id]["transcription_in_progress"] = False
        return ""
    transcription_text = result["text"]
    print(f"Transcription (client {client_id}): {transcription_text}")
    os.unlink(temp_filename)  # manually delete the temporary file

    # Reset the transcription_in_progress flag
    recording_data[client_id]["transcription_in_progress"] = False
    return transcription_text


async def main():
    """Parse command line arguments and start the WebSocket server."""
    parser = argparse.ArgumentParser(description='Audio WebSocket Server')
    parser.add_argument('--host', default=DEFAULT_HOST,
                        help=f'Host address (default: {DEFAULT_HOST})')
    parser.add_argument('--port', type=int, default=DEFAULT_PORT,
                        help=f'Port number (default: {DEFAULT_PORT})')
    parser.add_argument('--save', action='store_true',
                        help='Save audio to file')
    parser.add_argument('--play', action='store_true',
                        help='Play audio in real-time')
    parser.add_argument('--effects', action='store_true',
                        help='Apply audio effects')

    args = parser.parse_args()

    global SAVE_TO_FILE, PLAY_AUDIO, APPLY_EFFECTS
    SAVE_TO_FILE = args.save
    PLAY_AUDIO = args.play and PYDUB_AVAILABLE
    APPLY_EFFECTS = args.effects

    if args.play and not PYDUB_AVAILABLE:
        print("Warning: pydub is not available. Audio playback is disabled.")



    server = await websockets.serve(handle_client, args.host, args.port)
    print(f"WebSocket server started at ws://{args.host}:{args.port}")
    print(f"Save to file: {SAVE_TO_FILE}, Play audio: {PLAY_AUDIO}, Apply effects: {APPLY_EFFECTS}")

    await server.wait_closed()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nExiting...")
    except Exception as e:
        print(f"Error: {e}")

In later articles we will explore also the opportunity use the transcription text let the agent learn more about entities, events, remember persons etc. So stay tuned and do not forget to subscribe.


DISCLAIMER: The provided code does not present a production-ready setup regarding security and stability.
All code presented in this tutorial is used at your own risk.
Consider always performing security audits before putting any code in production.

Furthermore, none of the parts of the tutorials or code content should be considered as financial advice.
Always consult a professional investment advisor before making any investment decisions.


At CorticalFlow expanding the cognitive ability of the user is our mission.

---

At CorticalFlow expanding the cognitive ability of the user is our mission.

Disclaimer

The provided code does not present a production ready setup in regards of security and stability. All code presented in this tutorial is used under your own risk. Consider always security audits before you put any code in production.

None of the parts of the tutorials or code content should be considered as financial advice. Always consult a professional investment Advisor before taking an investment.