
In diesem Teil werden wir einen Echtzeit-Transkriptionsdienst nutzen, um die Audioaufnahmen der Nachrichtensendung zu transkribieren. Wir verwenden ein Client/Server-Modell, um dies in Zukunft besser auf mehrere Streams skalieren zu können.
Warum überhaupt Sprache transkribieren?
Heutzutage werden reine Textnachrichten bereits von vielen Algorithmen ausgewertet. Das bedeutet, es gibt für uns keine neuen Erkenntnisse mehr daraus.
Wir gehen direkter an die Quelle und nutzen unstrukturierte Daten, indem wir alles extrahieren, was wir aus den Nachrichten gewinnen können.
Das wird unserem KI-Agenten viel mehr Daten und Kontext liefern, als wenn wir nur kuratierte Nachrichtenfeeds auswerten würden.
Architektur
Implementierung
Client
Erklärung des Client-Codes:
Der Client-Code erfasst die Audioausgabe Ihres Systems mithilfe der PyAudio-Bibliothek. Zunächst werden verfügbare Audiogeräte aufgelistet, und Sie werden aufgefordert, ein Loopback-Gerät auszuwählen (einen virtuellen Eingang, der den Systemton erfasst). Sobald ein Gerät ausgewählt wurde, initialisiert der Code einen Audio-Stream und ruft wichtige Konfigurationsdetails wie die Anzahl der Kanäle, die Abtastrate und die Samplebreite ab. Diese Informationen werden an einen WebSocket-Server gesendet, um sicherzustellen, dass der Server die eingehenden Audiodaten korrekt verarbeiten kann. Der Client liest dann kontinuierlich kleine Audioabschnitte, kodiert diese in Base64 und überträgt sie über eine offene WebSocket-Verbindung. Er behandelt auch potenzielle Fehler und Verbindungsunterbrechungen, indem er versucht, sich erneut zu verbinden, was einen robusten Echtzeit-Audio-Stream gewährleistet.
Client-Code:
#!/usr/bin/env python3
"""
Real-time Audio Streaming Client
This script captures system audio output and streams it to a WebSocket server.
"""
import asyncio
import websockets
import pyaudio
import base64
import json
import platform
import argparse
# Audio parameters
CHUNK = 1024 # Number of frames per buffer
FORMAT = pyaudio.paInt16 # Audio format (16-bit PCM)
CHANNELS = 2 # Stereo
RATE = 44100 # Sample rate (Hz)
DEFAULT_WEBSOCKET_URL = "ws://192.168.100.51:8765"
def show_setup_instructions():
"""Display platform-specific loopback setup instructions."""
os_name = platform.system()
print("\n=== Audio Loopback Setup Instructions ===")
if os_name == "Windows":
print("Windows Setup:")
print("1. Install VB-Cable (Virtual Audio Cable): https://vb-audio.com/Cable/")
print("2. In Windows Sound settings, set VB-Cable as your default playback device")
print("3. Select VB-Cable Output as your input device in this application")
elif os_name == "Darwin": # macOS
print("macOS Setup:")
print("1. Install Soundflower: https://github.com/mattingalls/Soundflower/releases")
print("2. Open Audio MIDI Setup and create a Multi-Output Device")
print("3. Set this Multi-Output Device as your default output")
print("4. Select Soundflower (2ch) as your input device in this application")
elif os_name == "Linux":
print("Linux Setup:")
print("1. Using PulseAudio: pactl load-module module-loopback latency_msec=1")
print("2. Using PipeWire: pw-loopback")
print("3. Select the appropriate monitor device in this application")
print("========================================\n")
async def stream_audio(websocket_url=DEFAULT_WEBSOCKET_URL):
"""Capture and stream audio to the WebSocket server."""
p = pyaudio.PyAudio()
# Show setup instructions
show_setup_instructions()
# Print available devices
print("Available audio devices:")
for i in range(p.get_device_count()):
dev_info = p.get_device_info_by_index(i)
print(f"Device {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']}, outputs: {dev_info['maxOutputChannels']})")
# Ask for loopback device index
device_index = int(input("Enter the loopback device index: "))
device_info = p.get_device_info_by_index(device_index)
device_channels = device_info['maxInputChannels']
if device_channels <= 0:
print("Selected device has no input channels. Please choose a device with input capability.")
p.terminate()
return
device_rate = int(device_info['defaultSampleRate'])
# Open stream
try:
stream = p.open(
format=FORMAT,
channels=device_channels,
rate=device_rate,
input=True,
frames_per_buffer=CHUNK,
input_device_index=device_index
)
except Exception as e:
print(f"Error opening stream: {e}")
print("Please check your device selection and setup.")
p.terminate()
return
print(f"Stream opened, connecting to WebSocket server at {websocket_url}...")
# Prepare format info to be sent to the server upon connection
format_info = {
"event": "format",
"data": {
"channels": device_channels,
"sample_rate": device_rate,
"sample_width": pyaudio.get_sample_size(FORMAT),
"format": "int16"
}
}
try:
while True:
try:
async with websockets.connect(websocket_url) as websocket:
print("Connected to WebSocket server, streaming audio...")
await websocket.send(json.dumps(format_info))
while True:
# Read audio data
audio_data = stream.read(CHUNK, exception_on_overflow=False)
# Convert to base64 for sending
encoded_data = base64.b64encode(audio_data).decode('utf-8')
# Create a message packet
message = {
"event": "audio",
"data": encoded_data
}
# Send to the server
await websocket.send(json.dumps(message))
except Exception as e:
print(f"WebSocket connection error: {e}")
print("Attempting to reconnect in 3 seconds...")
await asyncio.sleep(3)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
stream.stop_stream()
stream.close()
p.terminate()
print("Audio streaming ended")
def main():
"""Parse command line arguments and start the audio streaming."""
parser = argparse.ArgumentParser(description='Audio WebSocket Client')
parser.add_argument('--url', default=DEFAULT_WEBSOCKET_URL,
help=f'WebSocket server URL (default: {DEFAULT_WEBSOCKET_URL})')
args = parser.parse_args()
try:
asyncio.run(stream_audio(args.url))
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Server
Erklärung des Server-Codes:
Der Server-Code richtet einen WebSocket-Server ein, um Audio-Streams von mehreren Clients zu empfangen und zu verarbeiten. Wenn sich ein Client verbindet, registriert der Server den Client und wartet auf zwei Arten von Ereignissen: ein 'format'-Ereignis, das Audio-Konfigurationsdetails enthält (Kanäle, Abtastrate usw.), und ein 'audio'-Ereignis, das Base64-kodierte Audio-Chunks sendet. Bei Empfang der Audiodaten wendet der Server optional Audioeffekte an und speichert die Audiodaten zum Speichern in einer WAV-Datei. Er führt auch eine Echtzeit-Transkription durch, indem er Audiosamples sammelt, bis ein Schwellenwert erreicht ist, und dann das Whisper-Modell in einem separaten Thread verwendet, um das Audio zu transkribieren. Wenn die Audiowiedergabe aktiviert und unterstützt wird, gibt der Server das Audio mit der pydub-Bibliothek wieder. Nach der Transkription wird der transkribierte Text ausgegeben. Der Server bereinigt auch Client-Daten bei Verbindungsunterbrechung und speichert bei Bedarf die vollständige Audioaufzeichnung in einer Datei.
Server-Code
#!/usr/bin/env python3
"""
Real-time Audio Streaming Server
This script receives audio streams from clients via WebSockets and can process,
play, or save the audio data.
"""
import asyncio
import websockets
import json
import base64
import wave
import os
import argparse
from datetime import datetime
import numpy as np
import io
import tempfile # for temporary WAV file handling for transcription
import whisper
# Try to import optional libraries for audio playback
try:
from pydub import AudioSegment
from pydub.playback import play
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
print("pydub not available. Audio playback will be disabled.")
# Server parameters
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8765
# Storage for audio data
connected_clients = set()
recording_data = {}
# Global variable to hold the Whisper model once loaded
whisper_model = whisper.load_model("large")
# Processing options (can be set via command line args)
SAVE_TO_FILE = True
PLAY_AUDIO = False
APPLY_EFFECTS = False
def apply_audio_effects(audio_bytes, format_info):
"""Apply audio effects to the audio data."""
# Convert audio bytes to numpy array
audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
# Reshape for stereo if needed
if format_info["channels"] == 2:
audio_array = audio_array.reshape(-1, 2)
# Apply simple effect (e.g., gain)
audio_array = audio_array * 1.5 # Increase volume by 50%
# Clip to avoid distortion
audio_array = np.clip(audio_array, -32768, 32767).astype(np.int16)
# Convert back to bytes
return audio_array.tobytes()
async def handle_client(websocket, path=None):
"""Handle a WebSocket client connection."""
client_id = id(websocket)
connected_clients.add(websocket)
recording_data[client_id] = {
"format_info": None,
"audio_data": [],
"transcription_buffer": b"",
"transcription_in_progress": False
}
print(f"Client {client_id} connected")
try:
async for message in websocket:
data = json.loads(message)
event_type = data.get("event")
if event_type == "format":
# Store format information
recording_data[client_id]["format_info"] = data.get("data")
print(f"Received format info from client {client_id}: {data.get('data')}")
elif event_type == "audio":
#print('EVENT AUDIO')
# Decode audio data
audio_bytes = base64.b64decode(data.get("data"))
# Apply effects if enabled
if APPLY_EFFECTS and recording_data[client_id]["format_info"]:
audio_bytes = apply_audio_effects(audio_bytes, recording_data[client_id]["format_info"])
# Store audio data for potential saving
if SAVE_TO_FILE:
recording_data[client_id]["audio_data"].append(audio_bytes)
# Real-time transcription using OpenAI Whisper: accumulate audio for a longer segment
if whisper_model is not None and recording_data[client_id]["format_info"]:
format_info = recording_data[client_id]["format_info"]
# Calculate the number of bytes corresponding to 1.5 seconds of audio
bytes_per_second = format_info["sample_rate"] * format_info["sample_width"] * format_info["channels"]
threshold = int(1.5 * bytes_per_second)
# Accumulate the received audio_bytes into the transcription buffer
recording_data[client_id]["transcription_buffer"] += audio_bytes
# If the buffer has reached the threshold and no transcription is running, start transcription
if len(recording_data[client_id]["transcription_buffer"]) >= threshold and not recording_data[client_id]["transcription_in_progress"]:
buffer_to_transcribe = recording_data[client_id]["transcription_buffer"]
recording_data[client_id]["transcription_buffer"] = b""
#print(f"Triggering transcription for client {client_id} with buffer size {len(buffer_to_transcribe)}")
recording_data[client_id]["transcription_in_progress"] = True
asyncio.create_task(transcribe_audio(buffer_to_transcribe, format_info, client_id))
else:
print("whisper_model NONE")
# Play audio if enabled
if PLAY_AUDIO and PYDUB_AVAILABLE and recording_data[client_id]["format_info"]:
format_info = recording_data[client_id]["format_info"]
try:
# Create an in-memory WAV file
wav_file = io.BytesIO()
with wave.open(wav_file, 'wb') as wf:
wf.setnchannels(format_info["channels"])
wf.setsampwidth(format_info["sample_width"])
wf.setframerate(format_info["sample_rate"])
wf.writeframes(audio_bytes)
wav_file.seek(0)
# Play audio using pydub
audio_segment = AudioSegment.from_wav(wav_file)
play(audio_segment)
except Exception as e:
print(f"Error playing audio: {e}")
except Exception as e:
print(f"Error with client {client_id}: {e}")
finally:
connected_clients.remove(websocket)
# Save audio data to file on disconnect
if SAVE_TO_FILE and client_id in recording_data:
save_to_file(client_id)
if client_id in recording_data:
del recording_data[client_id]
print(f"Client {client_id} disconnected")
def save_to_file(client_id):
"""Save the recorded audio data to a WAV file."""
if not recording_data[client_id]["audio_data"]:
print(f"No audio data to save for client {client_id}")
return
format_info = recording_data[client_id]["format_info"]
if not format_info:
print(f"No format info available for client {client_id}")
return
# Create output directory if it doesn't exist
os.makedirs("recordings", exist_ok=True)
# Create a filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recordings/audio_recording_{client_id}_{timestamp}.wav"
# Save to WAV file
with wave.open(filename, 'wb') as wf:
wf.setnchannels(format_info["channels"])
wf.setsampwidth(format_info["sample_width"])
wf.setframerate(format_info["sample_rate"])
wf.writeframes(b''.join(recording_data[client_id]["audio_data"]))
#print(f"Saved recording to {filename}")
async def transcribe_audio(audio_buffer, format_info, client_id):
"""Run transcription in a separate thread and handle subsequent buffered audio."""
#print(f"Starting transcription for client {client_id}, buffer length: {len(audio_buffer)}")
# Check if the audio is effectively silent; if so, return an empty transcription
audio_np = np.frombuffer(audio_buffer, dtype=np.int16)
if np.max(np.abs(audio_np)) < 500:
#print(f"Audio for client {client_id} appears silent. Returning empty transcription.")
recording_data[client_id]["transcription_in_progress"] = False
return ""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
temp_filename = tmp.name
with wave.open(temp_filename, 'wb') as wf:
wf.setnchannels(format_info["channels"])
wf.setsampwidth(format_info["sample_width"])
wf.setframerate(format_info["sample_rate"])
wf.writeframes(audio_buffer)
# Run transcription in a separate thread
try:
result = await asyncio.wait_for(asyncio.to_thread(whisper_model.transcribe, temp_filename), timeout=30)
except asyncio.TimeoutError:
print(f"Transcription timed out for client {client_id}")
os.unlink(temp_filename)
recording_data[client_id]["transcription_in_progress"] = False
return ""
transcription_text = result["text"]
print(f"Transcription (client {client_id}): {transcription_text}")
os.unlink(temp_filename) # manually delete the temporary file
# Reset the transcription_in_progress flag
recording_data[client_id]["transcription_in_progress"] = False
return transcription_text
async def main():
"""Parse command line arguments and start the WebSocket server."""
parser = argparse.ArgumentParser(description='Audio WebSocket Server')
parser.add_argument('--host', default=DEFAULT_HOST,
help=f'Host address (default: {DEFAULT_HOST})')
parser.add_argument('--port', type=int, default=DEFAULT_PORT,
help=f'Port number (default: {DEFAULT_PORT})')
parser.add_argument('--save', action='store_true',
help='Save audio to file')
parser.add_argument('--play', action='store_true',
help='Play audio in real-time')
parser.add_argument('--effects', action='store_true',
help='Apply audio effects')
args = parser.parse_args()
global SAVE_TO_FILE, PLAY_AUDIO, APPLY_EFFECTS
SAVE_TO_FILE = args.save
PLAY_AUDIO = args.play and PYDUB_AVAILABLE
APPLY_EFFECTS = args.effects
if args.play and not PYDUB_AVAILABLE:
print("Warning: pydub is not available. Audio playback is disabled.")
server = await websockets.serve(handle_client, args.host, args.port)
print(f"WebSocket server started at ws://{args.host}:{args.port}")
print(f"Save to file: {SAVE_TO_FILE}, Play audio: {PLAY_AUDIO}, Apply effects: {APPLY_EFFECTS}")
await server.wait_closed()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"Error: {e}")
In späteren Artikeln werden wir auch die Möglichkeit untersuchen, den Transkriptionstext zu nutzen, um den Agenten mehr über Entitäten, Ereignisse, Personen usw. lernen zu lassen. Bleiben Sie also dran und vergessen Sie nicht zu abonnieren.
HAFTUNGSAUSSCHLUSS: Der bereitgestellte Code stellt in Bezug auf Sicherheit und Stabilität keine produktionsreife Einrichtung dar.
Der gesamte in diesem Tutorial vorgestellte Code wird auf eigenes Risiko verwendet.
Denken Sie immer daran, Sicherheitsaudits durchzuführen, bevor Sie jeglichen Code in die Produktion übernehmen.
Darüber hinaus sollte kein Teil der Tutorials oder des Code-Inhalts als Finanzberatung betrachtet werden.
Konsultieren Sie immer einen professionellen Anlageberater, bevor Sie Anlageentscheidungen treffen.
Bei CorticalFlow ist die Erweiterung der kognitiven Fähigkeiten des Benutzers unsere Mission.
- Besuchen Sie unsere Website CorticalFlow
- Folgen Sie uns auf Twitter @CorticalFlow
- Besuchen Sie unser GitHub CorticalFlow
- Besuchen Sie unseren YouTube-Kanal CorticalFlow