
Google's Agent2Agent (A2A) protocol represents a significant advancement in AI agent communication, enabling seamless collaboration between AI systems regardless of their underlying frameworks or vendors. This report provides an in-depth analysis of the A2A protocol and presents a custom Python implementation that integrates A2A with Ollama, allowing interoperable agent communication without relying on existing A2A libraries.
Understanding the A2A Protocol
Agent2Agent (A2A) is an open protocol developed by Google in collaboration with more than 50 technology partners, including major companies like Salesforce, Atlassian, and SAP. Announced in April 2025 at Google Cloud Next, A2A was designed to address one of the biggest challenges in enterprise AI adoption: enabling agents built on different frameworks and by different vendors to communicate effectively with each other[^1][^9].
Core Concepts and Architecture
The A2A protocol is built on standard web technologies to facilitate wide adoption:
- Communication Layer: Uses HTTP for transport, JSON with JSON-RPC 2.0 conventions for structured messages, and Server-Sent Events (SSE) for streaming updates[^5].
- Agent Discovery: Agents advertise their capabilities through "Agent Cards" in JSON format, typically located at a standardized endpoint (/.well-known/agent.json)[^10].
- Task Management: Communication is task-oriented with well-defined lifecycle states including "submitted," "working," "input-required," "completed," "failed," and "canceled"[^5].
- Message Exchange: Agents exchange messages containing "parts" that can be text, binary file data, or structured JSON[^5].
The protocol facilitates interactions between a "client agent" (gathering and conveying tasks from users) and a "remote agent" (executing these tasks)[^5], creating an interoperable ecosystem where AI agents can freely collaborate across organizational boundaries.
A2A vs. MCP: Complementary Protocols
A2A complements Anthropic's Model Context Protocol (MCP), which focuses on integrating models with tools and data sources. While both protocols serve different primary purposes, they can work together in a cohesive AI ecosystem[^2][^11]:
- MCP focuses on tools and data integration, providing AI models with access to external data sources and tools.
- A2A emphasizes agent-to-agent communication, enabling coordination between intelligent agents[^12].
As noted by industry experts, "In a way, A2A acts as the 'team coordinator,' while MCP serves as the 'data provider'"[^2]. This distinction highlights how these protocols can complement each other, with A2A agents potentially leveraging MCP for data access while coordinating with other agents through A2A.
Implementing A2A with Ollama
The following implementation creates a Python-based A2A-compatible agent using the Ollama Python library. This implementation focuses on the core components of the A2A protocol while providing a simple, modular architecture.
The implementation is organized into a modular structure:
a2a/core/
- Core components of the A2A protocola2a/server.py
- Server implementation for exposing A2A endpointsa2a/client.py
- Client for interacting with A2A agentsexamples/
- Example applications demonstrating usage
Agent Card Implementation
Located in a2a/core/agent_card.py
, the Agent Card is a central component for capability discovery in the A2A protocol:
"""
Agent Card Module
This module handles the creation and management of A2A Agent Cards.
"""
import json
from typing import Dict, List, Any
class AgentCard:
"""
Class representing an A2A Agent Card.
Agent Cards are used for capability discovery in the A2A protocol.
"""
def __init__(
self,
name: str,
description: str,
endpoint: str,
skills: List[Dict[str, Any]],
version: str = "1.0.0",
):
"""
Initialize an Agent Card.
Args:
name: The name of the agent
description: A description of the agent
endpoint: The URL where the agent is accessible
skills: A list of skills the agent has
version: The version of the agent
"""
self.name = name
self.description = description
self.endpoint = endpoint
self.skills = skills
self.version = version
def to_dict(self) -> Dict[str, Any]:
"""
Convert the Agent Card to a dictionary.
Returns:
The Agent Card as a dictionary
"""
return {
"name": self.name,
"description": self.description,
"endpoint": self.endpoint,
"skills": self.skills,
"version": self.version,
"protocol": "a2a-1.0"
}
def to_json(self) -> str:
"""
Convert the Agent Card to a JSON string.
Returns:
The Agent Card as a JSON string
"""
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'AgentCard':
"""
Create an Agent Card from a dictionary.
Args:
data: The dictionary containing agent card data
Returns:
A new AgentCard instance
"""
return cls(
name=data.get("name", ""),
description=data.get("description", ""),
endpoint=data.get("endpoint", ""),
skills=data.get("skills", []),
version=data.get("version", "1.0.0"),
)
@classmethod
def from_json(cls, json_str: str) -> 'AgentCard':
"""
Create an Agent Card from a JSON string.
Args:
json_str: The JSON string containing agent card data
Returns:
A new AgentCard instance
"""
data = json.loads(json_str)
return cls.from_dict(data)
Task Manager Implementation
The Task Manager handles the lifecycle of tasks in the A2A protocol:
"""
Task Manager Module
This module handles task creation, tracking, and lifecycle management for A2A.
"""
import json
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime
class TaskManager:
"""
Class for managing A2A tasks.
This class handles the lifecycle of tasks in the A2A protocol.
"""
def __init__(self):
"""Initialize the Task Manager."""
self.tasks = {}
def create_task(self, params: Dict[str, Any]) -> str:
"""
Create a new task.
Args:
params: Parameters for the task
Returns:
The ID of the created task
"""
task_id = str(uuid.uuid4())
task = {
"id": task_id,
"status": "submitted",
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"params": params
}
self.tasks[task_id] = task
return task_id
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Get a task by ID.
Args:
task_id: The ID of the task
Returns:
The task or None if not found
"""
return self.tasks.get(task_id)
def update_task_status(self, task_id: str, status: str) -> bool:
"""
Update the status of a task.
Args:
task_id: The ID of the task
status: The new status (submitted, working, input-required, completed, failed, canceled)
Returns:
True if successful, False otherwise
"""
if task_id not in self.tasks:
return False
valid_statuses = ["submitted", "working", "input-required", "completed", "failed", "canceled"]
if status not in valid_statuses:
return False
self.tasks[task_id]["status"] = status
self.tasks[task_id]["updated_at"] = datetime.utcnow().isoformat()
return True
def list_tasks(self, status: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List tasks, optionally filtered by status.
Args:
status: Filter by this status if provided
Returns:
A list of tasks
"""
if status:
return [task for task in self.tasks.values() if task["status"] == status]
else:
return list(self.tasks.values())
Message Handler Implementation
The Message Handler manages the exchange of messages between agents:
"""
Message Handler Module
This module handles message creation and exchange between agents in the A2A protocol.
"""
import json
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime
class MessageHandler:
"""
Class for handling A2A messages.
This class manages the exchange of messages between agents in the A2A protocol.
"""
def __init__(self):
"""Initialize the Message Handler."""
self.messages = {}
def add_message(self, task_id: str, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Add a message to a task.
Args:
task_id: The ID of the task
message: The message to add
Returns:
The added message
"""
if task_id not in self.messages:
self.messages[task_id] = []
# Ensure message has an ID
if "id" not in message:
message["id"] = str(uuid.uuid4())
# Add timestamp
message["timestamp"] = datetime.utcnow().isoformat()
self.messages[task_id].append(message)
return message
def get_messages(self, task_id: str) -> List[Dict[str, Any]]:
"""
Get all messages for a task.
Args:
task_id: The ID of the task
Returns:
A list of messages for the task
"""
return self.messages.get(task_id, [])
def get_message(self, task_id: str, message_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific message by ID.
Args:
task_id: The ID of the task
message_id: The ID of the message
Returns:
The message or None if not found
"""
for message in self.messages.get(task_id, []):
if message.get("id") == message_id:
return message
return None
def format_message(self, role: str, content: str, content_type: str = "text") -> Dict[str, Any]:
"""
Create a formatted A2A message.
Args:
role: The role of the message sender (user, agent)
content: The content of the message
content_type: The type of content (text, json, binary)
Returns:
A formatted A2A message
"""
return {
"id": str(uuid.uuid4()),
"role": role,
"parts": [
{
"type": content_type,
"content": content
}
]
}
A2A-Ollama Integration
The main integration module connects Ollama with the A2A protocol:
"""
A2A Ollama Integration - Main Module
This module provides the main functionality for integrating Ollama with Google's A2A protocol.
"""
import json
import uuid
from typing import Dict, List, Optional, Union, Any
import ollama
from ollama import Client
from agent_card import AgentCard
from task_manager import TaskManager
from message_handler import MessageHandler
class A2AOllama:
"""
Main class for A2A Ollama integration.
This class integrates Ollama with the A2A protocol, allowing Ollama
to communicate with other A2A-compatible agents.
"""
def __init__(
self,
model: str,
name: str,
description: str,
skills: List[Dict[str, Any]],
host: str = "http://localhost:11434",
endpoint: str = "http://localhost:8000",
):
"""
Initialize A2AOllama.
Args:
model: The Ollama model to use
name: The name of the agent
description: A description of the agent
skills: A list of skills the agent has
host: The Ollama host URL
endpoint: The endpoint where this agent is accessible
"""
self.model = model
self.client = Client(host=host)
self.agent_card = AgentCard(
name=name,
description=description,
endpoint=endpoint,
skills=skills,
)
self.task_manager = TaskManager()
self.message_handler = MessageHandler()
def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Process an incoming A2A request.
Args:
request: The A2A request
Returns:
The response to the request
"""
method = request.get("method")
if method == "discovery":
return self.agent_card.to_dict()
elif method == "create_task":
task_id = self.task_manager.create_task(request.get("params", {}))
return {"task_id": task_id}
elif method == "get_task":
task_id = request.get("params", {}).get("task_id")
return self.task_manager.get_task(task_id)
elif method == "add_message":
task_id = request.get("params", {}).get("task_id")
message = request.get("params", {}).get("message")
return self.message_handler.add_message(task_id, message)
elif method == "process_task":
task_id = request.get("params", {}).get("task_id")
return self._process_task(task_id)
else:
return {"error": f"Unknown method: {method}"}
def _process_task(self, task_id: str) -> Dict[str, Any]:
"""
Process a task using Ollama.
Args:
task_id: The ID of the task to process
Returns:
The result of processing the task
"""
task = self.task_manager.get_task(task_id)
if not task:
return {"error": f"Task not found: {task_id}"}
messages = self.message_handler.get_messages(task_id)
ollama_messages = []
for message in messages:
content = ""
for part in message.get("parts", []):
if part.get("type") == "text":
content += part.get("content", "")
ollama_messages.append({
"role": message.get("role", "user"),
"content": content
})
try:
response = self.client.chat(
model=self.model,
messages=ollama_messages
)
# Create A2A message from Ollama response
a2a_message = {
"id": str(uuid.uuid4()),
"role": "agent",
"parts": [
{
"type": "text",
"content": response["message"]["content"]
}
]
}
self.message_handler.add_message(task_id, a2a_message)
self.task_manager.update_task_status(task_id, "completed")
return {
"task_id": task_id,
"status": "completed",
"message": a2a_message
}
except Exception as e:
self.task_manager.update_task_status(task_id, "failed")
return {
"task_id": task_id,
"status": "failed",
"error": str(e)
}
Server Implementation
The server exposes A2A endpoints:
"""
A2A Server Module
This module provides a simple HTTP server to expose A2A endpoints.
"""
import json
import os
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Dict, Any, List, Optional
from a2a_ollama import A2AOllama
# Global A2AOllama instance
a2a_ollama = None
class A2ARequestHandler(BaseHTTPRequestHandler):
"""
HTTP request handler for A2A requests.
"""
def do_GET(self):
"""Handle GET requests."""
if self.path == "/.well-known/agent.json":
self._handle_agent_card()
elif self.path.startswith("/tasks/"):
task_id = self.path.split("/")[-1]
self._handle_get_task(task_id)
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
"""Handle POST requests."""
content_length = int(self.headers.get("Content-Length", 0))
request_body = self.rfile.read(content_length).decode("utf-8")
try:
request = json.loads(request_body)
except json.JSONDecodeError:
self._send_error("Invalid JSON")
return
if self.path == "/rpc":
self._handle_rpc(request)
elif self.path == "/tasks":
self._handle_create_task(request)
elif self.path.startswith("/tasks/") and self.path.endswith("/messages"):
task_id = self.path.split("/")[-2]
self._handle_add_message(task_id, request)
else:
self.send_response(404)
self.end_headers()
def _handle_agent_card(self):
"""Handle agent card discovery."""
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
agent_card = a2a_ollama.agent_card.to_dict()
self.wfile.write(json.dumps(agent_card).encode("utf-8"))
def _handle_get_task(self, task_id: str):
"""Handle getting a task."""
task = a2a_ollama.task_manager.get_task(task_id)
if task:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(task).encode("utf-8"))
else:
self._send_error(f"Task not found: {task_id}")
def _handle_create_task(self, request: Dict[str, Any]):
"""Handle creating a task."""
task_id = a2a_ollama.task_manager.create_task(request)
self.send_response(201)
self.send_header("Content-Type", "application/json")
self.end_headers()
response = {"task_id": task_id}
self.wfile.write(json.dumps(response).encode("utf-8"))
def _handle_add_message(self, task_id: str, message: Dict[str, Any]):
"""Handle adding a message to a task."""
task = a2a_ollama.task_manager.get_task(task_id)
if not task:
self._send_error(f"Task not found: {task_id}")
return
added_message = a2a_ollama.message_handler.add_message(task_id, message)
# Process the task if status is submitted
if task["status"] == "submitted":
a2a_ollama.task_manager.update_task_status(task_id, "working")
result = a2a_ollama._process_task(task_id)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
response = {"message_id": added_message["id"]}
self.wfile.write(json.dumps(response).encode("utf-8"))
def _handle_rpc(self, request: Dict[str, Any]):
"""Handle RPC requests."""
response = a2a_ollama.process_request(request)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode("utf-8"))
def _send_error(self, message: str, status_code: int = 400):
"""Send an error response."""
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.end_headers()
error = {"error": message}
self.wfile.write(json.dumps(error).encode("utf-8"))
def run_server(
model: str,
name: str,
description: str,
skills: List[Dict[str, Any]],
port: int = 8000,
ollama_host: str = "http://localhost:11434",
endpoint: str = None
):
"""
Run the A2A server.
Args:
model: The Ollama model to use
name: The name of the agent
description: A description of the agent
skills: A list of skills the agent has
port: The port to run the server on
ollama_host: The Ollama host URL
endpoint: The endpoint where this agent is accessible
"""
global a2a_ollama
if endpoint is None:
endpoint = f"http://localhost:{port}"
a2a_ollama = A2AOllama(
model=model,
name=name,
description=description,
skills=skills,
host=ollama_host,
endpoint=endpoint,
)
server = HTTPServer(("", port), A2ARequestHandler)
print(f"Starting A2A server on port {port}...")
server.serve_forever()
if __name__ == "__main__":
# Example usage
skills = [
{
"id": "answer_questions",
"name": "Answer Questions",
"description": "Can answer general knowledge questions"
},
{
"id": "summarize_text",
"name": "Summarize Text",
"description": "Can summarize text content"
}
]
run_server(
model="llama3.2",
name="Ollama A2A Agent",
description="An A2A-compatible agent powered by Ollama",
skills=skills
)
Client Implementation
The client module for interacting with A2A agents:
"""
A2A Client Module
This module provides a client for interacting with A2A agents.
"""
import json
import requests
from typing import Dict, List, Optional, Any
class A2AClient:
"""
Client for interacting with A2A agents.
"""
def __init__(self, endpoint: str):
"""
Initialize the A2A client.
Args:
endpoint: The endpoint of the A2A agent
"""
self.endpoint = endpoint.rstrip("/")
def discover_agent(self) -> Dict[str, Any]:
"""
Discover an agent's capabilities.
Returns:
The agent card
"""
response = requests.get(f"{self.endpoint}/.well-known/agent.json")
response.raise_for_status()
return response.json()
def create_task(self, params: Dict[str, Any]) -> str:
"""
Create a new task.
Args:
params: Parameters for the task
Returns:
The ID of the created task
"""
response = requests.post(f"{self.endpoint}/tasks", json=params)
response.raise_for_status()
return response.json()["task_id"]
def get_task(self, task_id: str) -> Dict[str, Any]:
"""
Get a task by ID.
Args:
task_id: The ID of the task
Returns:
The task
"""
response = requests.get(f"{self.endpoint}/tasks/{task_id}")
response.raise_for_status()
return response.json()
def add_message(self, task_id: str, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Add a message to a task.
Args:
task_id: The ID of the task
message: The message to add
Returns:
The response
"""
response = requests.post(f"{self.endpoint}/tasks/{task_id}/messages", json=message)
response.raise_for_status()
return response.json()
def call_rpc(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Call an RPC method.
Args:
method: The name of the method
params: Parameters for the method
Returns:
The response
"""
if params is None:
params = {}
request = {
"jsonrpc": "2.0",
"id": "1",
"method": method,
"params": params
}
response = requests.post(f"{self.endpoint}/rpc", json=request)
response.raise_for_status()
return response.json()
def chat(self, content: str, task_id: Optional[str] = None) -> Dict[str, Any]:
"""
Chat with the agent.
Args:
content: The message content
task_id: An existing task ID (optional)
Returns:
The response
"""
if not task_id:
task_id = self.create_task({"type": "chat"})
message = {
"role": "user",
"parts": [
{
"type": "text",
"content": content
}
]
}
return self.add_message(task_id, message)
def main():
"""Example usage of the A2A client."""
import argparse
parser = argparse.ArgumentParser(description="A2A Client")
parser.add_argument("--endpoint", type=str, default="http://localhost:8000", help="The A2A agent endpoint")
parser.add_argument("--message", type=str, required=True, help="The message to send")
args = parser.parse_args()
client = A2AClient(args.endpoint)
try:
agent_card = client.discover_agent()
print(f"Connected to agent: {agent_card['name']}")
print(f"Description: {agent_card['description']}")
print(f"Skills: {', '.join(skill['name'] for skill in agent_card['skills'])}")
response = client.chat(args.message)
print("\nResponse:")
if "message" in response:
for part in response["message"]["parts"]:
if part["type"] == "text":
print(part["content"])
else:
print(response)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Installation and Usage Instructions
To get started with the A2A Ollama Integration, follow these steps:
Prerequisites
- Make sure you have Python 3.8 or higher installed.
- Install and start Ollama following the instructions on the Ollama website.
- Pull a model to use with Ollama:
ollama pull gemma3:27b
Installation
- Clone the repository:
git clone https://github.com/CorticalFlow/a2a-ollama
cd a2a-ollama
- Create a new virtual environment (recommended):
python -m venv venv
source venv/bin/activate # On Windows, use: venv\Scripts\activate
- Install required packages:
pip install -r requirements.txt
Project Structure
The project is organized with the following structure:
lab-a2a-ollama-2/
├── a2a/ # Core A2A implementation
│ ├── core/ # Core components
│ │ ├── agent_card.py # Agent card implementation
│ │ ├── task_manager.py # Task management
│ │ ├── message_handler.py # Message handling
│ │ └── a2a_ollama.py # Ollama integration
│ ├── server.py # A2A server implementation
│ └── client.py # A2A client for interacting with agents
├── examples/ # Example applications
│ ├── simple_chat/ # Simple chat example
│ │ ├── run_agent.py # Run a single agent
│ │ └── chat_with_agent.py # Chat with the agent
│ └── multi_agent/ # Multi-agent collaboration example
│ ├── agent_creative.py # Creative agent implementation
│ ├── agent_reasoning.py # Reasoning agent implementation
│ ├── agent_knowledge.py # Knowledge agent implementation
│ └── orchestrator.py # Multi-agent orchestrator
└── requirements.txt # Project dependencies
Running the Examples
Simple Chat Example
- Start the agent server:
python examples/simple_chat/run_agent.py
- In a separate terminal, chat with the agent:
python examples/simple_chat/chat_with_agent.py --message "What is the capital of France?"
- For interactive mode:
python examples/simple_chat/chat_with_agent.py --interactive
Multi-Agent Collaboration Example
- Start each agent in separate terminals:
# Terminal 1 - Knowledge Agent (port 8001)
python examples/multi_agent/agent_knowledge.py
# Terminal 2 - Reasoning Agent (port 8002)
python examples/multi_agent/agent_reasoning.py
# Terminal 3 - Creative Agent (port 8003)
python examples/multi_agent/agent_creative.py
- Run the orchestrator to coordinate between agents:
python examples/multi_agent/orchestrator.py --topic "renewable energy"
This example demonstrates the power of A2A by allowing specialized agents to collaborate:
- The knowledge agent provides factual information about the topic
- The reasoning agent analyzes implications and patterns
- The creative agent generates engaging content
- The orchestrator coordinates the workflow and synthesizes the final report