Google's Agent2Agent (A2A) Protocol: Implementation with Ollama Integration

published on April 24, 2025

A2A Ollama - a implementation of the A2A protocol using Ollama

Google's Agent2Agent (A2A) protocol represents a significant advancement in AI agent communication, enabling seamless collaboration between AI systems regardless of their underlying frameworks or vendors. This report provides an in-depth analysis of the A2A protocol and presents a custom Python implementation that integrates A2A with Ollama, allowing interoperable agent communication without relying on existing A2A libraries.

Understanding the A2A Protocol

Agent2Agent (A2A) is an open protocol developed by Google in collaboration with more than 50 technology partners, including major companies like Salesforce, Atlassian, and SAP. Announced in April 2025 at Google Cloud Next, A2A was designed to address one of the biggest challenges in enterprise AI adoption: enabling agents built on different frameworks and by different vendors to communicate effectively with each other[^1][^9].

Core Concepts and Architecture

The A2A protocol is built on standard web technologies to facilitate wide adoption:

  • Communication Layer: Uses HTTP for transport, JSON with JSON-RPC 2.0 conventions for structured messages, and Server-Sent Events (SSE) for streaming updates[^5].
  • Agent Discovery: Agents advertise their capabilities through "Agent Cards" in JSON format, typically located at a standardized endpoint (/.well-known/agent.json)[^10].
  • Task Management: Communication is task-oriented with well-defined lifecycle states including "submitted," "working," "input-required," "completed," "failed," and "canceled"[^5].
  • Message Exchange: Agents exchange messages containing "parts" that can be text, binary file data, or structured JSON[^5].

The protocol facilitates interactions between a "client agent" (gathering and conveying tasks from users) and a "remote agent" (executing these tasks)[^5], creating an interoperable ecosystem where AI agents can freely collaborate across organizational boundaries.

A2A vs. MCP: Complementary Protocols

A2A complements Anthropic's Model Context Protocol (MCP), which focuses on integrating models with tools and data sources. While both protocols serve different primary purposes, they can work together in a cohesive AI ecosystem[^2][^11]:

  • MCP focuses on tools and data integration, providing AI models with access to external data sources and tools.
  • A2A emphasizes agent-to-agent communication, enabling coordination between intelligent agents[^12].

As noted by industry experts, "In a way, A2A acts as the 'team coordinator,' while MCP serves as the 'data provider'"[^2]. This distinction highlights how these protocols can complement each other, with A2A agents potentially leveraging MCP for data access while coordinating with other agents through A2A.

Implementing A2A with Ollama

The following implementation creates a Python-based A2A-compatible agent using the Ollama Python library. This implementation focuses on the core components of the A2A protocol while providing a simple, modular architecture.

The implementation is organized into a modular structure:

  • a2a/core/ - Core components of the A2A protocol
  • a2a/server.py - Server implementation for exposing A2A endpoints
  • a2a/client.py - Client for interacting with A2A agents
  • examples/ - Example applications demonstrating usage

Agent Card Implementation

Located in a2a/core/agent_card.py, the Agent Card is a central component for capability discovery in the A2A protocol:

"""
Agent Card Module

This module handles the creation and management of A2A Agent Cards.
"""

import json
from typing import Dict, List, Any

class AgentCard:
    """
    Class representing an A2A Agent Card.

    Agent Cards are used for capability discovery in the A2A protocol.
    """

    def __init__(
        self,
        name: str,
        description: str,
        endpoint: str,
        skills: List[Dict[str, Any]],
        version: str = "1.0.0",
    ):
        """
        Initialize an Agent Card.

        Args:
            name: The name of the agent
            description: A description of the agent
            endpoint: The URL where the agent is accessible
            skills: A list of skills the agent has
            version: The version of the agent
        """
        self.name = name
        self.description = description
        self.endpoint = endpoint
        self.skills = skills
        self.version = version

    def to_dict(self) -> Dict[str, Any]:
        """
        Convert the Agent Card to a dictionary.

        Returns:
            The Agent Card as a dictionary
        """
        return {
            "name": self.name,
            "description": self.description,
            "endpoint": self.endpoint,
            "skills": self.skills,
            "version": self.version,
            "protocol": "a2a-1.0"
        }

    def to_json(self) -> str:
        """
        Convert the Agent Card to a JSON string.

        Returns:
            The Agent Card as a JSON string
        """
        return json.dumps(self.to_dict())

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'AgentCard':
        """
        Create an Agent Card from a dictionary.

        Args:
            data: The dictionary containing agent card data

        Returns:
            A new AgentCard instance
        """
        return cls(
            name=data.get("name", ""),
            description=data.get("description", ""),
            endpoint=data.get("endpoint", ""),
            skills=data.get("skills", []),
            version=data.get("version", "1.0.0"),
        )

    @classmethod
    def from_json(cls, json_str: str) -> 'AgentCard':
        """
        Create an Agent Card from a JSON string.

        Args:
            json_str: The JSON string containing agent card data

        Returns:
            A new AgentCard instance
        """
        data = json.loads(json_str)
        return cls.from_dict(data)

Task Manager Implementation

The Task Manager handles the lifecycle of tasks in the A2A protocol:

"""
Task Manager Module

This module handles task creation, tracking, and lifecycle management for A2A.
"""

import json
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime

class TaskManager:
    """
    Class for managing A2A tasks.

    This class handles the lifecycle of tasks in the A2A protocol.
    """

    def __init__(self):
        """Initialize the Task Manager."""
        self.tasks = {}

    def create_task(self, params: Dict[str, Any]) -> str:
        """
        Create a new task.

        Args:
            params: Parameters for the task

        Returns:
            The ID of the created task
        """
        task_id = str(uuid.uuid4())

        task = {
            "id": task_id,
            "status": "submitted",
            "created_at": datetime.utcnow().isoformat(),
            "updated_at": datetime.utcnow().isoformat(),
            "params": params
        }

        self.tasks[task_id] = task
        return task_id

    def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
        """
        Get a task by ID.

        Args:
            task_id: The ID of the task

        Returns:
            The task or None if not found
        """
        return self.tasks.get(task_id)

    def update_task_status(self, task_id: str, status: str) -> bool:
        """
        Update the status of a task.

        Args:
            task_id: The ID of the task
            status: The new status (submitted, working, input-required, completed, failed, canceled)

        Returns:
            True if successful, False otherwise
        """
        if task_id not in self.tasks:
            return False

        valid_statuses = ["submitted", "working", "input-required", "completed", "failed", "canceled"]
        if status not in valid_statuses:
            return False

        self.tasks[task_id]["status"] = status
        self.tasks[task_id]["updated_at"] = datetime.utcnow().isoformat()

        return True

    def list_tasks(self, status: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        List tasks, optionally filtered by status.

        Args:
            status: Filter by this status if provided

        Returns:
            A list of tasks
        """
        if status:
            return [task for task in self.tasks.values() if task["status"] == status]
        else:
            return list(self.tasks.values())

Message Handler Implementation

The Message Handler manages the exchange of messages between agents:

"""
Message Handler Module

This module handles message creation and exchange between agents in the A2A protocol.
"""

import json
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime

class MessageHandler:
    """
    Class for handling A2A messages.

    This class manages the exchange of messages between agents in the A2A protocol.
    """

    def __init__(self):
        """Initialize the Message Handler."""
        self.messages = {}

    def add_message(self, task_id: str, message: Dict[str, Any]) -> Dict[str, Any]:
        """
        Add a message to a task.

        Args:
            task_id: The ID of the task
            message: The message to add

        Returns:
            The added message
        """
        if task_id not in self.messages:
            self.messages[task_id] = []

        # Ensure message has an ID
        if "id" not in message:
            message["id"] = str(uuid.uuid4())

        # Add timestamp
        message["timestamp"] = datetime.utcnow().isoformat()

        self.messages[task_id].append(message)
        return message

    def get_messages(self, task_id: str) -> List[Dict[str, Any]]:
        """
        Get all messages for a task.

        Args:
            task_id: The ID of the task

        Returns:
            A list of messages for the task
        """
        return self.messages.get(task_id, [])

    def get_message(self, task_id: str, message_id: str) -> Optional[Dict[str, Any]]:
        """
        Get a specific message by ID.

        Args:
            task_id: The ID of the task
            message_id: The ID of the message

        Returns:
            The message or None if not found
        """
        for message in self.messages.get(task_id, []):
            if message.get("id") == message_id:
                return message

        return None

    def format_message(self, role: str, content: str, content_type: str = "text") -> Dict[str, Any]:
        """
        Create a formatted A2A message.

        Args:
            role: The role of the message sender (user, agent)
            content: The content of the message
            content_type: The type of content (text, json, binary)

        Returns:
            A formatted A2A message
        """
        return {
            "id": str(uuid.uuid4()),
            "role": role,
            "parts": [
                {
                    "type": content_type,
                    "content": content
                }
            ]
        }

A2A-Ollama Integration

The main integration module connects Ollama with the A2A protocol:

"""
A2A Ollama Integration - Main Module

This module provides the main functionality for integrating Ollama with Google's A2A protocol.
"""

import json
import uuid
from typing import Dict, List, Optional, Union, Any

import ollama
from ollama import Client

from agent_card import AgentCard
from task_manager import TaskManager
from message_handler import MessageHandler

class A2AOllama:
    """
    Main class for A2A Ollama integration.

    This class integrates Ollama with the A2A protocol, allowing Ollama
    to communicate with other A2A-compatible agents.
    """

    def __init__(
        self,
        model: str,
        name: str,
        description: str,
        skills: List[Dict[str, Any]],
        host: str = "http://localhost:11434",
        endpoint: str = "http://localhost:8000",
    ):
        """
        Initialize A2AOllama.

        Args:
            model: The Ollama model to use
            name: The name of the agent
            description: A description of the agent
            skills: A list of skills the agent has
            host: The Ollama host URL
            endpoint: The endpoint where this agent is accessible
        """
        self.model = model
        self.client = Client(host=host)
        self.agent_card = AgentCard(
            name=name,
            description=description,
            endpoint=endpoint,
            skills=skills,
        )
        self.task_manager = TaskManager()
        self.message_handler = MessageHandler()

    def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process an incoming A2A request.

        Args:
            request: The A2A request

        Returns:
            The response to the request
        """
        method = request.get("method")

        if method == "discovery":
            return self.agent_card.to_dict()
        elif method == "create_task":
            task_id = self.task_manager.create_task(request.get("params", {}))
            return {"task_id": task_id}
        elif method == "get_task":
            task_id = request.get("params", {}).get("task_id")
            return self.task_manager.get_task(task_id)
        elif method == "add_message":
            task_id = request.get("params", {}).get("task_id")
            message = request.get("params", {}).get("message")
            return self.message_handler.add_message(task_id, message)
        elif method == "process_task":
            task_id = request.get("params", {}).get("task_id")
            return self._process_task(task_id)
        else:
            return {"error": f"Unknown method: {method}"}

    def _process_task(self, task_id: str) -> Dict[str, Any]:
        """
        Process a task using Ollama.

        Args:
            task_id: The ID of the task to process

        Returns:
            The result of processing the task
        """
        task = self.task_manager.get_task(task_id)

        if not task:
            return {"error": f"Task not found: {task_id}"}

        messages = self.message_handler.get_messages(task_id)
        ollama_messages = []

        for message in messages:
            content = ""
            for part in message.get("parts", []):
                if part.get("type") == "text":
                    content += part.get("content", "")

            ollama_messages.append({
                "role": message.get("role", "user"),
                "content": content
            })

        try:
            response = self.client.chat(
                model=self.model,
                messages=ollama_messages
            )

            # Create A2A message from Ollama response
            a2a_message = {
                "id": str(uuid.uuid4()),
                "role": "agent",
                "parts": [
                    {
                        "type": "text",
                        "content": response["message"]["content"]
                    }
                ]
            }

            self.message_handler.add_message(task_id, a2a_message)
            self.task_manager.update_task_status(task_id, "completed")

            return {
                "task_id": task_id,
                "status": "completed",
                "message": a2a_message
            }
        except Exception as e:
            self.task_manager.update_task_status(task_id, "failed")
            return {
                "task_id": task_id,
                "status": "failed",
                "error": str(e)
            }

Server Implementation

The server exposes A2A endpoints:

"""
A2A Server Module

This module provides a simple HTTP server to expose A2A endpoints.
"""

import json
import os
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Dict, Any, List, Optional

from a2a_ollama import A2AOllama

# Global A2AOllama instance
a2a_ollama = None

class A2ARequestHandler(BaseHTTPRequestHandler):
    """
    HTTP request handler for A2A requests.
    """

    def do_GET(self):
        """Handle GET requests."""
        if self.path == "/.well-known/agent.json":
            self._handle_agent_card()
        elif self.path.startswith("/tasks/"):
            task_id = self.path.split("/")[-1]
            self._handle_get_task(task_id)
        else:
            self.send_response(404)
            self.end_headers()

    def do_POST(self):
        """Handle POST requests."""
        content_length = int(self.headers.get("Content-Length", 0))
        request_body = self.rfile.read(content_length).decode("utf-8")

        try:
            request = json.loads(request_body)
        except json.JSONDecodeError:
            self._send_error("Invalid JSON")
            return

        if self.path == "/rpc":
            self._handle_rpc(request)
        elif self.path == "/tasks":
            self._handle_create_task(request)
        elif self.path.startswith("/tasks/") and self.path.endswith("/messages"):
            task_id = self.path.split("/")[-2]
            self._handle_add_message(task_id, request)
        else:
            self.send_response(404)
            self.end_headers()

    def _handle_agent_card(self):
        """Handle agent card discovery."""
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.end_headers()

        agent_card = a2a_ollama.agent_card.to_dict()
        self.wfile.write(json.dumps(agent_card).encode("utf-8"))

    def _handle_get_task(self, task_id: str):
        """Handle getting a task."""
        task = a2a_ollama.task_manager.get_task(task_id)

        if task:
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()

            self.wfile.write(json.dumps(task).encode("utf-8"))
        else:
            self._send_error(f"Task not found: {task_id}")

    def _handle_create_task(self, request: Dict[str, Any]):
        """Handle creating a task."""
        task_id = a2a_ollama.task_manager.create_task(request)

        self.send_response(201)
        self.send_header("Content-Type", "application/json")
        self.end_headers()

        response = {"task_id": task_id}
        self.wfile.write(json.dumps(response).encode("utf-8"))

    def _handle_add_message(self, task_id: str, message: Dict[str, Any]):
        """Handle adding a message to a task."""
        task = a2a_ollama.task_manager.get_task(task_id)

        if not task:
            self._send_error(f"Task not found: {task_id}")
            return

        added_message = a2a_ollama.message_handler.add_message(task_id, message)

        # Process the task if status is submitted
        if task["status"] == "submitted":
            a2a_ollama.task_manager.update_task_status(task_id, "working")
            result = a2a_ollama._process_task(task_id)

            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()

            self.wfile.write(json.dumps(result).encode("utf-8"))
        else:
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()

            response = {"message_id": added_message["id"]}
            self.wfile.write(json.dumps(response).encode("utf-8"))

    def _handle_rpc(self, request: Dict[str, Any]):
        """Handle RPC requests."""
        response = a2a_ollama.process_request(request)

        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.end_headers()

        self.wfile.write(json.dumps(response).encode("utf-8"))

    def _send_error(self, message: str, status_code: int = 400):
        """Send an error response."""
        self.send_response(status_code)
        self.send_header("Content-Type", "application/json")
        self.end_headers()

        error = {"error": message}
        self.wfile.write(json.dumps(error).encode("utf-8"))

def run_server(
    model: str,
    name: str,
    description: str,
    skills: List[Dict[str, Any]],
    port: int = 8000,
    ollama_host: str = "http://localhost:11434",
    endpoint: str = None
):
    """
    Run the A2A server.

    Args:
        model: The Ollama model to use
        name: The name of the agent
        description: A description of the agent
        skills: A list of skills the agent has
        port: The port to run the server on
        ollama_host: The Ollama host URL
        endpoint: The endpoint where this agent is accessible
    """
    global a2a_ollama

    if endpoint is None:
        endpoint = f"http://localhost:{port}"

    a2a_ollama = A2AOllama(
        model=model,
        name=name,
        description=description,
        skills=skills,
        host=ollama_host,
        endpoint=endpoint,
    )

    server = HTTPServer(("", port), A2ARequestHandler)
    print(f"Starting A2A server on port {port}...")
    server.serve_forever()

if __name__ == "__main__":
    # Example usage
    skills = [
        {
            "id": "answer_questions",
            "name": "Answer Questions",
            "description": "Can answer general knowledge questions"
        },
        {
            "id": "summarize_text",
            "name": "Summarize Text",
            "description": "Can summarize text content"
        }
    ]

    run_server(
        model="llama3.2",
        name="Ollama A2A Agent",
        description="An A2A-compatible agent powered by Ollama",
        skills=skills
    )

Client Implementation

The client module for interacting with A2A agents:

"""
A2A Client Module

This module provides a client for interacting with A2A agents.
"""

import json
import requests
from typing import Dict, List, Optional, Any

class A2AClient:
    """
    Client for interacting with A2A agents.
    """

    def __init__(self, endpoint: str):
        """
        Initialize the A2A client.

        Args:
            endpoint: The endpoint of the A2A agent
        """
        self.endpoint = endpoint.rstrip("/")

    def discover_agent(self) -> Dict[str, Any]:
        """
        Discover an agent's capabilities.

        Returns:
            The agent card
        """
        response = requests.get(f"{self.endpoint}/.well-known/agent.json")
        response.raise_for_status()
        return response.json()

    def create_task(self, params: Dict[str, Any]) -> str:
        """
        Create a new task.

        Args:
            params: Parameters for the task

        Returns:
            The ID of the created task
        """
        response = requests.post(f"{self.endpoint}/tasks", json=params)
        response.raise_for_status()
        return response.json()["task_id"]

    def get_task(self, task_id: str) -> Dict[str, Any]:
        """
        Get a task by ID.

        Args:
            task_id: The ID of the task

        Returns:
            The task
        """
        response = requests.get(f"{self.endpoint}/tasks/{task_id}")
        response.raise_for_status()
        return response.json()

    def add_message(self, task_id: str, message: Dict[str, Any]) -> Dict[str, Any]:
        """
        Add a message to a task.

        Args:
            task_id: The ID of the task
            message: The message to add

        Returns:
            The response
        """
        response = requests.post(f"{self.endpoint}/tasks/{task_id}/messages", json=message)
        response.raise_for_status()
        return response.json()

    def call_rpc(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Call an RPC method.

        Args:
            method: The name of the method
            params: Parameters for the method

        Returns:
            The response
        """
        if params is None:
            params = {}

        request = {
            "jsonrpc": "2.0",
            "id": "1",
            "method": method,
            "params": params
        }

        response = requests.post(f"{self.endpoint}/rpc", json=request)
        response.raise_for_status()
        return response.json()

    def chat(self, content: str, task_id: Optional[str] = None) -> Dict[str, Any]:
        """
        Chat with the agent.

        Args:
            content: The message content
            task_id: An existing task ID (optional)

        Returns:
            The response
        """
        if not task_id:
            task_id = self.create_task({"type": "chat"})

        message = {
            "role": "user",
            "parts": [
                {
                    "type": "text",
                    "content": content
                }
            ]
        }

        return self.add_message(task_id, message)

def main():
    """Example usage of the A2A client."""
    import argparse

    parser = argparse.ArgumentParser(description="A2A Client")
    parser.add_argument("--endpoint", type=str, default="http://localhost:8000", help="The A2A agent endpoint")
    parser.add_argument("--message", type=str, required=True, help="The message to send")

    args = parser.parse_args()

    client = A2AClient(args.endpoint)

    try:
        agent_card = client.discover_agent()
        print(f"Connected to agent: {agent_card['name']}")
        print(f"Description: {agent_card['description']}")
        print(f"Skills: {', '.join(skill['name'] for skill in agent_card['skills'])}")

        response = client.chat(args.message)
        print("\nResponse:")

        if "message" in response:
            for part in response["message"]["parts"]:
                if part["type"] == "text":
                    print(part["content"])
        else:
            print(response)
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

Installation and Usage Instructions

To get started with the A2A Ollama Integration, follow these steps:

Prerequisites

  1. Make sure you have Python 3.8 or higher installed.
  2. Install and start Ollama following the instructions on the Ollama website.
  3. Pull a model to use with Ollama:
ollama pull gemma3:27b

Installation

  1. Clone the repository:
git clone https://github.com/CorticalFlow/a2a-ollama
cd a2a-ollama
  1. Create a new virtual environment (recommended):
python -m venv venv
source venv/bin/activate  # On Windows, use: venv\Scripts\activate
  1. Install required packages:
pip install -r requirements.txt

Project Structure

The project is organized with the following structure:

lab-a2a-ollama-2/
├── a2a/                       # Core A2A implementation
│   ├── core/                  # Core components
│   │   ├── agent_card.py      # Agent card implementation
│   │   ├── task_manager.py    # Task management
│   │   ├── message_handler.py # Message handling
│   │   └── a2a_ollama.py      # Ollama integration
│   ├── server.py              # A2A server implementation
│   └── client.py              # A2A client for interacting with agents
├── examples/                  # Example applications
│   ├── simple_chat/           # Simple chat example
│   │   ├── run_agent.py       # Run a single agent
│   │   └── chat_with_agent.py # Chat with the agent
│   └── multi_agent/           # Multi-agent collaboration example
│       ├── agent_creative.py  # Creative agent implementation
│       ├── agent_reasoning.py # Reasoning agent implementation
│       ├── agent_knowledge.py # Knowledge agent implementation
│       └── orchestrator.py    # Multi-agent orchestrator
└── requirements.txt           # Project dependencies

Running the Examples

Simple Chat Example

  1. Start the agent server:
python examples/simple_chat/run_agent.py
  1. In a separate terminal, chat with the agent:
python examples/simple_chat/chat_with_agent.py --message "What is the capital of France?"
  1. For interactive mode:
python examples/simple_chat/chat_with_agent.py --interactive

Multi-Agent Collaboration Example

  1. Start each agent in separate terminals:
# Terminal 1 - Knowledge Agent (port 8001)
python examples/multi_agent/agent_knowledge.py

# Terminal 2 - Reasoning Agent (port 8002)
python examples/multi_agent/agent_reasoning.py

# Terminal 3 - Creative Agent (port 8003)
python examples/multi_agent/agent_creative.py
  1. Run the orchestrator to coordinate between agents:
python examples/multi_agent/orchestrator.py --topic "renewable energy"

This example demonstrates the power of A2A by allowing specialized agents to collaborate:

  • The knowledge agent provides factual information about the topic
  • The reasoning agent analyzes implications and patterns
  • The creative agent generates engaging content
  • The orchestrator coordinates the workflow and synthesizes the final report

At CorticalFlow expanding the cognitive ability of the user is our mission.

Disclaimer

The provided code does not present a production ready setup in regards of security and stability. All code presented in this tutorial is used under your own risk. Consider always security audits before you put any code in production.

None of the parts of the tutorials or code content should be considered as financial advice. Always consult a professional investment Advisor before taking an investment.