KI-Fallstudie: Implementierung eines Echtzeit-Finanznachrichten-Agenten - Teil 4: Echtzeit-Transkription

published on March 30, 2025

Ein Bild eines Mikrofons, das für die Sprachtranskription verwendet wird

In diesem Teil werden wir einen Echtzeit-Transkriptionsdienst nutzen, um die Audioaufnahmen der Nachrichtensendung zu transkribieren. Wir verwenden ein Client/Server-Modell, um dies in Zukunft besser auf mehrere Streams skalieren zu können.

Warum überhaupt Sprache transkribieren?

Heutzutage werden reine Textnachrichten bereits von vielen Algorithmen ausgewertet. Das bedeutet, es gibt für uns keine neuen Erkenntnisse mehr daraus.

Wir gehen direkter an die Quelle und nutzen unstrukturierte Daten, indem wir alles extrahieren, was wir aus den Nachrichten gewinnen können.

Das wird unserem KI-Agenten viel mehr Daten und Kontext liefern, als wenn wir nur kuratierte Nachrichtenfeeds auswerten würden.

Architektur

Implementierung

Client

Erklärung des Client-Codes:

Der Client-Code erfasst die Audioausgabe Ihres Systems mithilfe der PyAudio-Bibliothek. Zunächst werden verfügbare Audiogeräte aufgelistet, und Sie werden aufgefordert, ein Loopback-Gerät auszuwählen (einen virtuellen Eingang, der den Systemton erfasst). Sobald ein Gerät ausgewählt wurde, initialisiert der Code einen Audio-Stream und ruft wichtige Konfigurationsdetails wie die Anzahl der Kanäle, die Abtastrate und die Samplebreite ab. Diese Informationen werden an einen WebSocket-Server gesendet, um sicherzustellen, dass der Server die eingehenden Audiodaten korrekt verarbeiten kann. Der Client liest dann kontinuierlich kleine Audioabschnitte, kodiert diese in Base64 und überträgt sie über eine offene WebSocket-Verbindung. Er behandelt auch potenzielle Fehler und Verbindungsunterbrechungen, indem er versucht, sich erneut zu verbinden, was einen robusten Echtzeit-Audio-Stream gewährleistet.

Client-Code:

#!/usr/bin/env python3
"""
Real-time Audio Streaming Client

This script captures system audio output and streams it to a WebSocket server.
"""

import asyncio
import websockets
import pyaudio
import base64
import json
import platform
import argparse

# Audio parameters
CHUNK = 1024  # Number of frames per buffer
FORMAT = pyaudio.paInt16  # Audio format (16-bit PCM)
CHANNELS = 2  # Stereo
RATE = 44100  # Sample rate (Hz)
DEFAULT_WEBSOCKET_URL = "ws://192.168.100.51:8765"


def show_setup_instructions():
    """Display platform-specific loopback setup instructions."""
    os_name = platform.system()
    print("\n=== Audio Loopback Setup Instructions ===")

    if os_name == "Windows":
        print("Windows Setup:")
        print("1. Install VB-Cable (Virtual Audio Cable): https://vb-audio.com/Cable/")
        print("2. In Windows Sound settings, set VB-Cable as your default playback device")
        print("3. Select VB-Cable Output as your input device in this application")

    elif os_name == "Darwin":  # macOS
        print("macOS Setup:")
        print("1. Install Soundflower: https://github.com/mattingalls/Soundflower/releases")
        print("2. Open Audio MIDI Setup and create a Multi-Output Device")
        print("3. Set this Multi-Output Device as your default output")
        print("4. Select Soundflower (2ch) as your input device in this application")

    elif os_name == "Linux":
        print("Linux Setup:")
        print("1. Using PulseAudio: pactl load-module module-loopback latency_msec=1")
        print("2. Using PipeWire: pw-loopback")
        print("3. Select the appropriate monitor device in this application")

    print("========================================\n")


async def stream_audio(websocket_url=DEFAULT_WEBSOCKET_URL):
    """Capture and stream audio to the WebSocket server."""
    p = pyaudio.PyAudio()

    # Show setup instructions
    show_setup_instructions()

    # Print available devices
    print("Available audio devices:")
    for i in range(p.get_device_count()):
        dev_info = p.get_device_info_by_index(i)
        print(f"Device {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']}, outputs: {dev_info['maxOutputChannels']})")

    # Ask for loopback device index
    device_index = int(input("Enter the loopback device index: "))
    device_info = p.get_device_info_by_index(device_index)
    device_channels = device_info['maxInputChannels']
    if device_channels <= 0:
        print("Selected device has no input channels. Please choose a device with input capability.")
        p.terminate()
        return
    device_rate = int(device_info['defaultSampleRate'])

    # Open stream
    try:
        stream = p.open(
            format=FORMAT,
            channels=device_channels,
            rate=device_rate,
            input=True,
            frames_per_buffer=CHUNK,
            input_device_index=device_index
        )
    except Exception as e:
        print(f"Error opening stream: {e}")
        print("Please check your device selection and setup.")
        p.terminate()
        return

    print(f"Stream opened, connecting to WebSocket server at {websocket_url}...")

    # Prepare format info to be sent to the server upon connection
    format_info = {
        "event": "format",
        "data": {
            "channels": device_channels,
            "sample_rate": device_rate,
            "sample_width": pyaudio.get_sample_size(FORMAT),
            "format": "int16"
        }
    }

    try:
        while True:
            try:
                async with websockets.connect(websocket_url) as websocket:
                    print("Connected to WebSocket server, streaming audio...")
                    await websocket.send(json.dumps(format_info))
                    while True:
                        # Read audio data
                        audio_data = stream.read(CHUNK, exception_on_overflow=False)

                        # Convert to base64 for sending
                        encoded_data = base64.b64encode(audio_data).decode('utf-8')

                        # Create a message packet
                        message = {
                            "event": "audio",
                            "data": encoded_data
                        }

                        # Send to the server
                        await websocket.send(json.dumps(message))
            except Exception as e:
                print(f"WebSocket connection error: {e}")
                print("Attempting to reconnect in 3 seconds...")
                await asyncio.sleep(3)
    except Exception as e:
        print(f"Error: {e}")

    finally:
        # Clean up
        stream.stop_stream()
        stream.close()
        p.terminate()
        print("Audio streaming ended")


def main():
    """Parse command line arguments and start the audio streaming."""
    parser = argparse.ArgumentParser(description='Audio WebSocket Client')
    parser.add_argument('--url', default=DEFAULT_WEBSOCKET_URL,
                        help=f'WebSocket server URL (default: {DEFAULT_WEBSOCKET_URL})')

    args = parser.parse_args()

    try:
        asyncio.run(stream_audio(args.url))
    except KeyboardInterrupt:
        print("\nExiting...")
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    main()

Server

Erklärung des Server-Codes:

Der Server-Code richtet einen WebSocket-Server ein, um Audio-Streams von mehreren Clients zu empfangen und zu verarbeiten. Wenn sich ein Client verbindet, registriert der Server den Client und wartet auf zwei Arten von Ereignissen: ein 'format'-Ereignis, das Audio-Konfigurationsdetails enthält (Kanäle, Abtastrate usw.), und ein 'audio'-Ereignis, das Base64-kodierte Audio-Chunks sendet. Bei Empfang der Audiodaten wendet der Server optional Audioeffekte an und speichert die Audiodaten zum Speichern in einer WAV-Datei. Er führt auch eine Echtzeit-Transkription durch, indem er Audiosamples sammelt, bis ein Schwellenwert erreicht ist, und dann das Whisper-Modell in einem separaten Thread verwendet, um das Audio zu transkribieren. Wenn die Audiowiedergabe aktiviert und unterstützt wird, gibt der Server das Audio mit der pydub-Bibliothek wieder. Nach der Transkription wird der transkribierte Text ausgegeben. Der Server bereinigt auch Client-Daten bei Verbindungsunterbrechung und speichert bei Bedarf die vollständige Audioaufzeichnung in einer Datei.

Server-Code


#!/usr/bin/env python3
"""
Real-time Audio Streaming Server

This script receives audio streams from clients via WebSockets and can process,
play, or save the audio data.
"""

import asyncio
import websockets
import json
import base64
import wave
import os
import argparse
from datetime import datetime
import numpy as np
import io
import tempfile  # for temporary WAV file handling for transcription
import whisper
# Try to import optional libraries for audio playback
try:
    from pydub import AudioSegment
    from pydub.playback import play
    PYDUB_AVAILABLE = True
except ImportError:
    PYDUB_AVAILABLE = False
    print("pydub not available. Audio playback will be disabled.")

# Server parameters
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8765

# Storage for audio data
connected_clients = set()
recording_data = {}

# Global variable to hold the Whisper model once loaded
whisper_model = whisper.load_model("large")

# Processing options (can be set via command line args)
SAVE_TO_FILE = True
PLAY_AUDIO = False
APPLY_EFFECTS = False


def apply_audio_effects(audio_bytes, format_info):
    """Apply audio effects to the audio data."""
    # Convert audio bytes to numpy array
    audio_array = np.frombuffer(audio_bytes, dtype=np.int16)

    # Reshape for stereo if needed
    if format_info["channels"] == 2:
        audio_array = audio_array.reshape(-1, 2)

    # Apply simple effect (e.g., gain)
    audio_array = audio_array * 1.5  # Increase volume by 50%

    # Clip to avoid distortion
    audio_array = np.clip(audio_array, -32768, 32767).astype(np.int16)

    # Convert back to bytes
    return audio_array.tobytes()


async def handle_client(websocket, path=None):
    """Handle a WebSocket client connection."""
    client_id = id(websocket)
    connected_clients.add(websocket)

    recording_data[client_id] = {
        "format_info": None,
        "audio_data": [],
        "transcription_buffer": b"",
        "transcription_in_progress": False
    }

    print(f"Client {client_id} connected")

    try:
        async for message in websocket:
            data = json.loads(message)
            event_type = data.get("event")

            if event_type == "format":
                # Store format information
                recording_data[client_id]["format_info"] = data.get("data")
                print(f"Received format info from client {client_id}: {data.get('data')}")

            elif event_type == "audio":

                #print('EVENT AUDIO')

                # Decode audio data
                audio_bytes = base64.b64decode(data.get("data"))

                # Apply effects if enabled
                if APPLY_EFFECTS and recording_data[client_id]["format_info"]:
                    audio_bytes = apply_audio_effects(audio_bytes, recording_data[client_id]["format_info"])

                # Store audio data for potential saving
                if SAVE_TO_FILE:
                    recording_data[client_id]["audio_data"].append(audio_bytes)

                # Real-time transcription using OpenAI Whisper: accumulate audio for a longer segment
                if whisper_model is not None and recording_data[client_id]["format_info"]:
                    format_info = recording_data[client_id]["format_info"]
                    # Calculate the number of bytes corresponding to 1.5 seconds of audio
                    bytes_per_second = format_info["sample_rate"] * format_info["sample_width"] * format_info["channels"]
                    threshold = int(1.5 * bytes_per_second)
                    # Accumulate the received audio_bytes into the transcription buffer
                    recording_data[client_id]["transcription_buffer"] += audio_bytes
                    # If the buffer has reached the threshold and no transcription is running, start transcription
                    if len(recording_data[client_id]["transcription_buffer"]) >= threshold and not recording_data[client_id]["transcription_in_progress"]:
                        buffer_to_transcribe = recording_data[client_id]["transcription_buffer"]
                        recording_data[client_id]["transcription_buffer"] = b""
                        #print(f"Triggering transcription for client {client_id} with buffer size {len(buffer_to_transcribe)}")
                        recording_data[client_id]["transcription_in_progress"] = True
                        asyncio.create_task(transcribe_audio(buffer_to_transcribe, format_info, client_id))
                else:
                    print("whisper_model NONE")

                # Play audio if enabled
                if PLAY_AUDIO and PYDUB_AVAILABLE and recording_data[client_id]["format_info"]:
                    format_info = recording_data[client_id]["format_info"]
                    try:
                        # Create an in-memory WAV file
                        wav_file = io.BytesIO()
                        with wave.open(wav_file, 'wb') as wf:
                            wf.setnchannels(format_info["channels"])
                            wf.setsampwidth(format_info["sample_width"])
                            wf.setframerate(format_info["sample_rate"])
                            wf.writeframes(audio_bytes)
                        wav_file.seek(0)
                        # Play audio using pydub
                        audio_segment = AudioSegment.from_wav(wav_file)
                        play(audio_segment)
                    except Exception as e:
                        print(f"Error playing audio: {e}")

    except Exception as e:
        print(f"Error with client {client_id}: {e}")

    finally:
        connected_clients.remove(websocket)

        # Save audio data to file on disconnect
        if SAVE_TO_FILE and client_id in recording_data:
            save_to_file(client_id)

        if client_id in recording_data:
            del recording_data[client_id]

        print(f"Client {client_id} disconnected")


def save_to_file(client_id):
    """Save the recorded audio data to a WAV file."""
    if not recording_data[client_id]["audio_data"]:
        print(f"No audio data to save for client {client_id}")
        return

    format_info = recording_data[client_id]["format_info"]
    if not format_info:
        print(f"No format info available for client {client_id}")
        return

    # Create output directory if it doesn't exist
    os.makedirs("recordings", exist_ok=True)

    # Create a filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"recordings/audio_recording_{client_id}_{timestamp}.wav"

    # Save to WAV file
    with wave.open(filename, 'wb') as wf:
        wf.setnchannels(format_info["channels"])
        wf.setsampwidth(format_info["sample_width"])
        wf.setframerate(format_info["sample_rate"])
        wf.writeframes(b''.join(recording_data[client_id]["audio_data"]))

    #print(f"Saved recording to {filename}")


async def transcribe_audio(audio_buffer, format_info, client_id):
    """Run transcription in a separate thread and handle subsequent buffered audio."""
    #print(f"Starting transcription for client {client_id}, buffer length: {len(audio_buffer)}")
    # Check if the audio is effectively silent; if so, return an empty transcription
    audio_np = np.frombuffer(audio_buffer, dtype=np.int16)
    if np.max(np.abs(audio_np)) < 500:
        #print(f"Audio for client {client_id} appears silent. Returning empty transcription.")
        recording_data[client_id]["transcription_in_progress"] = False
        return ""
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
        temp_filename = tmp.name
    with wave.open(temp_filename, 'wb') as wf:
        wf.setnchannels(format_info["channels"])
        wf.setsampwidth(format_info["sample_width"])
        wf.setframerate(format_info["sample_rate"])
        wf.writeframes(audio_buffer)
    # Run transcription in a separate thread
    try:
        result = await asyncio.wait_for(asyncio.to_thread(whisper_model.transcribe, temp_filename), timeout=30)
    except asyncio.TimeoutError:
        print(f"Transcription timed out for client {client_id}")
        os.unlink(temp_filename)
        recording_data[client_id]["transcription_in_progress"] = False
        return ""
    transcription_text = result["text"]
    print(f"Transcription (client {client_id}): {transcription_text}")
    os.unlink(temp_filename)  # manually delete the temporary file

    # Reset the transcription_in_progress flag
    recording_data[client_id]["transcription_in_progress"] = False
    return transcription_text


async def main():
    """Parse command line arguments and start the WebSocket server."""
    parser = argparse.ArgumentParser(description='Audio WebSocket Server')
    parser.add_argument('--host', default=DEFAULT_HOST,
                        help=f'Host address (default: {DEFAULT_HOST})')
    parser.add_argument('--port', type=int, default=DEFAULT_PORT,
                        help=f'Port number (default: {DEFAULT_PORT})')
    parser.add_argument('--save', action='store_true',
                        help='Save audio to file')
    parser.add_argument('--play', action='store_true',
                        help='Play audio in real-time')
    parser.add_argument('--effects', action='store_true',
                        help='Apply audio effects')

    args = parser.parse_args()

    global SAVE_TO_FILE, PLAY_AUDIO, APPLY_EFFECTS
    SAVE_TO_FILE = args.save
    PLAY_AUDIO = args.play and PYDUB_AVAILABLE
    APPLY_EFFECTS = args.effects

    if args.play and not PYDUB_AVAILABLE:
        print("Warning: pydub is not available. Audio playback is disabled.")



    server = await websockets.serve(handle_client, args.host, args.port)
    print(f"WebSocket server started at ws://{args.host}:{args.port}")
    print(f"Save to file: {SAVE_TO_FILE}, Play audio: {PLAY_AUDIO}, Apply effects: {APPLY_EFFECTS}")

    await server.wait_closed()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nExiting...")
    except Exception as e:
        print(f"Error: {e}")

In späteren Artikeln werden wir auch die Möglichkeit untersuchen, den Transkriptionstext zu nutzen, um den Agenten mehr über Entitäten, Ereignisse, Personen usw. lernen zu lassen. Bleiben Sie also dran und vergessen Sie nicht zu abonnieren.


HAFTUNGSAUSSCHLUSS: Der bereitgestellte Code stellt in Bezug auf Sicherheit und Stabilität keine produktionsreife Einrichtung dar.
Der gesamte in diesem Tutorial vorgestellte Code wird auf eigenes Risiko verwendet.
Denken Sie immer daran, Sicherheitsaudits durchzuführen, bevor Sie jeglichen Code in die Produktion übernehmen.

Darüber hinaus sollte kein Teil der Tutorials oder des Code-Inhalts als Finanzberatung betrachtet werden.
Konsultieren Sie immer einen professionellen Anlageberater, bevor Sie Anlageentscheidungen treffen.


Bei CorticalFlow ist die Erweiterung der kognitiven Fähigkeiten des Benutzers unsere Mission.


At CorticalFlow expanding the cognitive ability of the user is our mission.

Disclaimer

The provided code does not present a production ready setup in regards of security and stability. All code presented in this tutorial is used under your own risk. Consider always security audits before you put any code in production.

None of the parts of the tutorials or code content should be considered as financial advice. Always consult a professional investment Advisor before taking an investment.