By Diptiman Raichaudhuri, Staff Developer Advocate at Confluent
Engineering teams are experimenting with AI agents at a rapid pace. In fact, a recent PwC survey found that
With orchestrator agents, it becomes possible to combine responses from different agentic invocations and return a single, meaningful insight to the user. This represents a shift from simple request-response cycles to a pattern where heterogeneous microservices exchange messages across multiple enterprise applications. For a Python developer using frameworks like FastAPI, this transition is seamless—the orchestrator simply becomes a more sophisticated "API endpoint" that coordinates data flow between various specialized agents.
Frameworks such as FastMCP enable Python developers to quickly build deterministic MCP tools so the ambiguity of LLMs trying to understand raw user instructions gets replaced by enterprise components for fetching insights. FastMCP is the framework of choice for converting FastAPI based REST API endpoints into MCP tool call specification. While MCP abstracts away the individual tool invocation on databases and APIs, the existing Python API codebase needs to be modified to introduce these agentic invocation endpoints. For production deployments, ensure that the exact MCP tool is invoked, instead of listing “all tools” which often results in MCP “tool sprawl” and “context bloat”. To get rid of this, a novel technique is often introduced where a “search tool” API precedes the actual MCP tool call.
For streaming workloads,
To enrich, aggregate and transform streaming data stored in Kafka, Apache Flink SQL provides a familiar programming model. With Flink SQL, real-time agentic workflows impose data quality rules and transform in-flight data on Kafka topics. This is an important step within the entire data pipeline and ensures that agents invoking data downstream in the pipeline have rich, clean and trustworthy data.
Let’s apply these concepts to a real time agentic workflow for a retail store looking to better understand their customers’ behaviors. Imagine we have a clothing store which takes and fulfills orders. Over time, historical order data accumulates, highlighting customer buying behavior noting which products sell out fast, repeat customers, and which age groups order the most clothes. Insights from this historical data helps the store decide on a medium to long term strategy to introduce a new clothing line and devise plans to maximize revenue. At the same time, real time customer shopping behavior reflects how customers are buying products as it happens. Real time insights help the store attract repeat purchases and up-sell offers using instant promotions.
Let’s assume that the historical data is stored as Apache Iceberg tables and the real-time behavior is stored on Kafka. Combining these two sources would provide the store accurate buyer behavior in real time, based on past habits. To do this, AI agents would need to extract information from an Online Analytical Processing (OLAP) system and Apache Kafka. To illustrate this, let’s consider the OLAP system to consist of .Parquet files kept within an Apache Iceberg lakehouse which is queried by DuckDB and real time stream storage consumed from Apache Kafka. Two AI agents, each querying using DuckDB MCP and Kafka MCP would fetch required information. An orchestrator AI agent would combine results to return a structured JSON output for any natural language query initiated by the user.
Any agentic framework like LangGraph or AWS Strands could invoke tools on these MCP servers and get the required information. On top of the agentic framework, an orchestrator agent could combine responses from these MCP servers and present unique details about store and customer buying behavior in real-time.
For a Python AI developer, the first step for designing this solution would require defining the FastMCP tools for Kafka and DuckDB MCP. Then agentic invocations need to be built, followed by API endpoints which would enable invokers to engage with these agents. The last step would be to add observability features and evaluate AI responses to improve the agentic flow and make it trustworthy. Evaluation of final responses from agents against standards, compliances and benchmarks remains an important step to ensure user acceptance. Since most of the enrichment, cleaning and aggregation of the real-time data would happen at the FlinkSQL layer, developing MCP tools and the invocation layer would only involve pure pythonic development.
Python developers familiar with Kafka and Flink can build such agentic workflows as the one above while working within the familiar territory of FastAPI, FastMCP and open source agentic frameworks like LangGraph and AWS Strands. With modern Python clients for Kafka and the rich tooling around MCP and AI agents, it’s possible to introduce an agentic workflow within existing microservices in enterprise applications.
For modern Python developers, the path to building sophisticated AI agents doesn't require a total architectural overhaul. By combining FastMCP for MCP tools discovery, as well as Apache Kafka and Apache Flink SQL for real-time data integrity, you can ensure your agents are powered by clean, trustworthy data. Whether you are using LangGraph or AWS Strands, the shift toward agentic workflows is essentially an evolution of the microservices patterns you already know. By treating AI agents as an extension of your existing Kafka and Flink infrastructure, you can move from simple data streaming to delivering real-time, intelligent insights.
Chatting With Kafka and Flink - AI Agents and Confluent MCP for Python Application Developers
Kafka and Flink from an application developer’s perspective (150 words)
- Application developers build microservices for business applications,
- Kafka as the de-facto standard for stream storage
- Easy to use as a messaging hub for microservices talking with each other - cart API <-> order creation API <-> loyalty API <-> order fulfillment API etc ..
- Flink creates enriched, transformed and high quality aggregated data within Kafka topics
What if an AI agent could be embedded within microservices ?(150 words)
- Microservices would always have REST endpoints
- Introduce `/chat` as the chat with agents
- Other endpoints would build the business logic which would be passed to `/chat/ as context
MCP(Model Context Protocol) - the language AI agents speak(200 words)
- Expose Kafka and Flink tools via MCP streamable-http transport mode
- Agents would introspect and discover tools and take LLM’s help to match user query with the specific tool
Retail Store behavior agent - an example(400 words)
- Explain the setup - a retailer accepting fast orders through stores
- Run Confluent MCP - describe github repo of mcp-confluent and Kafka, Flink specific tools
- Agent list Kafka and FLink tools and introspect using LLM’s help.
- Agent answers question “which store accepted the most orders in the last 5 minutes”.
- The agent calls MCP tools consume Kafka messages and responds.