
This article is part 2 of a series of articles about how to implement a real-time financial news agent. Here you can find the overview of the project: AI Case Study: Implementing a Real-time Financial News Agent - Overview
Overview
In this part we will capture the screen and extract the faces of the speakers.
Because we talk about real-time processing we need a lot of performance. To achieve this we will use the capture the screen on a client machine and send the frames to a more powerful server with 48 Cores 128 GB RAM and 2 Tesla P40 GPUs (in case you have already a GPU on your Client Machine you can run server and client on the same machine). On the Server we process the video frame by frame and detect the faces. For each face apearance we create an face embedding. For now we just print the embeddings to the console. We will implement later that the server will store the embeddings in a MongoDB database.
- Capture the screen in a client server model
- Detect faces in the frames (on the server)
- Create embeddings of the faces (on the server)
Create a conda environment
To encapsulate the dependencies we will use a conda environment(checkout Conda Docs for more information).
conda create -n ai-news-agent python=3.10
conda activate ai-news-agent
Checkout the code (on client and server)
git clone https://github.com/corticalflow/ai-news-agent-part-2.git
cd ai-news-agent
Install the dependencies
# on the client machine
pip install -r requirements_client.txt
# on the server machine
pip install -r requirements_server.txt
pip install tensorflow[and-cuda]
pip install tf-keras
create data folder for processed frames
mkdir -p ./data
run the server (on server machine)
python server.py
run the client (on client machine)
python client.py
Client Code:
import socket
import cv2
import mss
import numpy as np
import struct
def main():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host_ip = '192.168.100.51' # Replace with server IP address
#host_ip = '127.0.0.1' # Replace with server IP address
port = 9999
client_socket.connect((host_ip, port))
with mss.mss() as sct:
monitor = sct.monitors[3]
try:
while True:
img = np.array(sct.grab(monitor))
frame = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100])
message = struct.pack(">L", len(buffer)) + buffer.tobytes()
client_socket.sendall(message)
except KeyboardInterrupt:
print("Stopped by user.")
finally:
client_socket.close()
if __name__ == "__main__":
main()
Explaination what the code does:
The client code is responsible for capturing the screen and streaming the frames to a powerful server for real-time processing. Here's what it does:
- Socket Connection: Establishes a TCP connection to the server using a specified IP address and port.
- Screen Capture: Utilizes the mss library to continuously grab the contents of a designated monitor.
- Color Conversion: Converts the captured image from BGRA to BGR using OpenCV, ensuring proper color accuracy.
- Image Encoding: Compresses each frame into JPEG format (with 100% quality) to optimize it for network transmission.
- Data Packaging: Uses Python's struct module to prepend a header containing the length of the JPEG data, allowing the server to know exactly how many bytes to read.
- Frame Transmission: Sends the packaged frame bytes over the socket connection continuously.
- Graceful Shutdown: Handles keyboard interruption (KeyboardInterrupt) to close the socket connection properly when the user stops the process.
This design ensures that the client efficiently streams real-time visual data to the server for further tasks like face detection and embedding generation.
Server Code:
import os
# if you dont have an CUDA GPU uncomment the following two lines. The Script will run on CPU which can be slow
#os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # Disable GPU usage to prevent CUDA warnings when drivers are missing.
#os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" # Suppress TensorFlow info and warning messages.
# we dont have a display so we need to use offscreen rendering
os.environ["QT_QPA_PLATFORM"] = "offscreen" # Use offscreen rendering to bypass display issues
import cv2
import socket
import numpy as np
import struct
from datetime import datetime
from retinaface import RetinaFace # Added import for face detection
from deepface import DeepFace
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
def receive_all(sock, count):
buf = b''
while count:
newbuf = sock.recv(count)
if not newbuf:
return None
buf += newbuf
count -= len(newbuf)
return buf
def main():
print('main')
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host_ip = '0.0.0.0'
port = 9999
server_socket.bind((host_ip, port))
server_socket.listen(5)
print("Listening at:", (host_ip, port))
client_socket, addr = server_socket.accept()
print('Connection from:', addr)
# Removed GUI window initialization since we're running headless
# cv2.namedWindow('Received', cv2.WINDOW_NORMAL) # No GUI window initialization needed in headless mode
try:
while True:
message_size = receive_all(client_socket, struct.calcsize(">L"))
if not message_size:
break
message_size = struct.unpack(">L", message_size)[0]
frame_data = receive_all(client_socket, message_size)
if not frame_data:
break
print('new frame received')
frame = np.frombuffer(frame_data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
# extract text
# Face detection using RetinaFace and drawing bounding boxes
faces = RetinaFace.detect_faces(frame)
if isinstance(faces, dict):
for face in faces.values():
print('found face')
print(face)
bbox = face["facial_area"]
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2)
# Extract the face region using the bounding box coordinates
face_img = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
# Compute vector embedding using DeepFace with the Facenet model
embedding = DeepFace.represent(face_img, model_name='Facenet', enforce_detection=False)
print("Face Embedding:", embedding)
timestamp = datetime.utcnow().timestamp()
# save to filesystem
cv2.imwrite(f'./data/frame_{timestamp}.png', frame)
# Removed GUI key-check as there's no display for capture
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
except Exception as e:
print(e)
finally:
print('finally')
cv2.destroyAllWindows()
client_socket.close()
server_socket.close()
if __name__ == "__main__":
print("main call")
main()
Explanation what the Server Code does:
- The code establishes a continuous loop to receive video frames over a network connection.
- It unpacks the expected size of an incoming frame and reads the complete frame data from the client socket.
- The binary frame data is converted into a numpy array and then decoded into an image using OpenCV.
- A placeholder exists for text extraction from the frame (to be implemented later).
- Face detection is performed using the RetinaFace library:
- If faces are detected (returned as a dictionary), the code iterates through each face.
- For each detected face, it logs that a face was found, draws a red bounding box around the face using the given coordinates, and extracts the face region.
- A facial embedding vector is computed using DeepFace with the Facenet model, and the resulting embedding is logged.
- A timestamp is generated, and the processed frame is saved to the filesystem.
- In the finally block, the code cleans up resources by closing the OpenCV windows and the network sockets.
This design is optimized for a headless operation where no GUI interactions (like key-press detections) are required.
Further Processing will be include:
- Store the embeddings in a MongoDB database
- Face Clustering
- Assign Identities(for example from Name Tags on the screen to the faces)
You can find the code for this part in the GitHub repository.