In this article, I'll show how the Argo Workflows Executor Plugin lets you extend the Argo Workflows controller without maintaining your own fork—simply by implementing a small HTTP server in any language. As a bonus, this same mechanism reduces the number of extra pods in your DAGs and lightens the load on the Kubernetes scheduler. If you're new to Argo, I'll briefly cover the architecture and where plugins fit in. We'll finish with practical examples and key configuration details.

Motivation

I decided to write this article about the Argo Workflows Executor Plugin while working on a Kubernetes Enhancement Proposal (KEP) for Kubeflow Pipelines, where I needed a deeper understanding of its security model, capabilities, and limitations.

TL;DR: Argo Workflows

Before diving into Executor Plugins, let’s quickly recap what Argo Workflows is and how it executes workflows.


What is a Workflow?


A workflow is a sequence of tasks executed in a defined order and typically described in YAML. It can be a simple linear chain or a complex DAG with parallel steps and dependencies. Workflows are common in CI/CD, data processing, ML pipelines, and any scenario that requires coordinating multiple tasks reliably.


Argo Workflows is a Kubernetes-native engine that takes these YAML-defined workflows and executes each task in its own pod. This provides parallelism, explicit dependencies, and precise resource control, while leveraging Kubernetes scaling, scheduling, and reliability. With this model, Argo can orchestrate anything from simple automation to full CI/CD and ML pipelines.

Inside Argo Workflows: Architecture Overview

Let’s look at a simple three-step YAML from the official docs to see how Argo Workflows is structured and how its components interact.

Interactions with Argo Workflows typically start via the Argo CLI.


For example, if I’ve downloaded a workflow YAML description called hello.yaml. I can create an Argo Workflow from it by running::

argo submit hello.yaml

Under the hood, the CLI sends the YAML definition to the Kubernetes API, which creates an Argo Workflow Custom Resource (CR).

Once Kubernetes creates a Workflow CR, the Workflow Controller starts managing its lifecycle, as shown below.


At a high level, the WorkflowController in Argo Workflows v3.7.3 operates with two informers: one for Workflow CRDs and one for the Pods created during workflow execution. Both informers push keys into their respective queues for reconciliation.


Every Workflow Step Is a Kubernetes Pod (until you introduce an Executor Plugin)

This design comes with a lot of benefits. Since each step runs in its own pod, you can fully leverage the Kubernetes scheduler, pre-allocate or limit resources for individual tasks, and take advantage of pod restart for reliability. You also have full access to Kubernetes primitives, such as volumes, networking, and other resources, giving you complete control over workflow execution. On top of that, you get out-of-the-box access to logs, since each pod’s logs can be inspected directly.


This setup is especially ideal for heavy or relatively long-running tasks, where visibility, resource control, and fault isolation are crucial for keeping the cluster stable and preventing interference with other workloads.


But how does this work in practice? In a workflow, each step is based on a template. Let’s look at an example from the official documentation:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and print-message
  templates:
    - name: hello-hello-hello
      # Instead of just running a container
      # This template has a sequence of steps
      steps:
        - - name: hello1            # hello1 is run before the following steps
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello1"
        - - name: hello2a           # double dash => run after previous step
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2a"
          - name: hello2b           # single dash => run in parallel with previous step
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2b"

    # This is the same template as from the previous example
    - name: print-message
      inputs:
        parameters:
          - name: message

      container:
        image: busybox
        command: [echo]
        args: ["{{inputs.parameters.message}}"]


Here we can see template:container, which means a pod will be created for all steps, as shown below.

kubectl -n argo get po
NAMEREADYSTATUSRESTARTS AGE
steps-6zjdt-print-message-227836356 0/2Completed0 60s
steps-6zjdt-print-message-3558628268 0/2Completed0 50s
steps-6zjdt-print-message-3608961125 0/2Completed0 50s


Pods Come with Overhead

However, if some of our tasks are lightweight or execute very quickly, the “one pod per task” approach has drawbacks.


In practice, this means you want separate pods for long‑running or heavy steps, while lightweight tasks are better off sharing a long‑lived pod that can be reused across many steps.

So, How Do We Reduce Pod Creation Overhead?

Executor Plugin was introduced to address exactly this class of problems: they let you move step execution out of per‑step pods into a reusable agent pod, without modifying the core controller or maintaining a fork.


At a high level, instead of creating a new pod for each step with template: container, a template: plugin creates a single agent pod that is reused for all steps using that plugin while the workflow is running.

Why run user code in a separate agent pod instead of the Workflow Controller?

Running user code in a separate agent pod has several benefits:


Prerequisites for the Executor Plugin

Simply changing a template from container to plugin isn’t enough - some preparation is required beforehand.


Let’s look at a high-level overview of how the Executor Plugin works. Components specific to the Executor Plugin are highlighted in green on the diagram:

To get your plugin up and running, you need to follow a few key high-level steps. Details for each step are provided below:


First, here’s a link to the repository where I implemented all these steps in Argo Workflows so the plugin can be used. I’ll be using examples from this repo in the sections below.

Configure the Argo Workflow Controller

By default, the Argo Workflow Executor Plugin is disabled in the Workflow Controller.


As a result, the Workflow Controller ignores any plugin configurations that were added to the cluster. To enable it, simply add the environment variable ARGO_EXECUTOR_PLUGINS=true.


In my demo repository, I do this by patching the Workflow Controller as shown below:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-controller
  namespace: argo
spec:
  template:
    spec:
      containers:
        - name: workflow-controller
          env:
            # Enable the Executor Plugin.
            # The Executor Plugin is disabled by default.
            # Without this environment variable, the workflow-controller
            # will not register any executor plugins.
            - name: ARGO_EXECUTOR_PLUGINS
              value: "true"


Once enabled, the Workflow Controller uses a special informer to watch all new or updated ConfigMaps containing plugin settings.

It automatically registers any Executor Plugins defined in these ConfigMaps, based on the following label:

kubectl get cm -l workflows.argoproj.io/configmap-type=ExecutorPlugin

Implement a server

Logic of a plugin is implemented in a server.

Plugin it is the extending point for the Argo Workflow Controller.


Argo Workflow Controller communicates with a plugin via the API contract.


Here is the brief explanation of the contract.

{
  "node": {
    "phase": "Succeeded", // Use "Failed" to mark the step as failed
    // other parameters, e.g., outputs
  }
}


Important: To mark a step as failed, the server should return "phase": "Failed". HTTP errors are treated as unexpected issues by the controller and split into transient (retried) and non-transient categories. For more details, refer to the documentation.


Here’s an example of a simple server implementation written in Python:

from fastapi import FastAPI, Request, HTTPException
import uvicorn

app = FastAPI()

@app.post("/api/v1/template.execute")
async def execute(request: Request):
    data = await request.json()

    print_message_plugin_request = data.get("template", {}).get("plugin", {}).get("print-message-plugin")
    if print_message_plugin_request is None:
        return None

    if print_message_plugin_request.get('args') is None:
        raise HTTPException(status_code=400, detail=f"invalid request body. "
                                                    f"Expecting template.plugin.print-message-plugin.args, found: f{data}")
    message = print_message_plugin_request.get('args')

    print(f"PRINT: {message}" if message else "No message")

    return {
        "node": {
            "phase": "Succeeded",
            "outputs": {
                "parameters": [{"name": "result", "value": f'{message} processed by print-message-plugin'}]
            }
        }
    }


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)


It simply prints a message and returns a string in the output, which can be used as an input parameter for the next steps or DAG tasks. This is essentially our extension point


About the Request Parameter

If you submit the following Argo Workflow:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-plugin-
spec:
  entrypoint: hello-hello-hello

  templates:
    - name: hello-hello-hello
      steps:
        - - name: hello1
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello1"

    - name: print-message
      inputs:
        parameters:
          - name: message
      plugin:
        print-message-plugin:
          args: ["{{inputs.parameters.message}}"]


The request body sent to the plugin according to the template will be:

template.plugin.print-message-plugin.args: <value>


Explanation:

Inject the Plugin Server Implementation into an Argo Workflow

Here, everything is simple and follows the standard Argo Workflow approach:



According to the documentation, you can build the ConfigMap as follows:


Sample for the print-message-plugin:

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: print
spec:
  sidecar:
    container:
      image: print-message-plugin:latest
      name: print-message-plugin
      ports:
        - containerPort: 8080
      securityContext:
        runAsNonRoot: false
        runAsUser: 65534 # nobody
      resources:
        requests:
          memory: "64Mi"
          cpu: "250m"
        limits:
          memory: "128Mi"
          cpu: "500m"
argo executor-plugin build .


The Workflow Controller will pick up the plugin automatically. When an Argo Workflow runs, it will execute our plugin image inside a dedicated agent pod.

Add additional RBAC permissions for WorkflowTaskSet

This is necessary for updating workflow statuses.

Secure the plugin to only accept requests from the Argo Workflow Controller

As noted, our plugin runs as a sidecar container inside a dedicated pod. It exposes an HTTP endpoint externally.

This means that, in theory, any workload in the cluster could send HTTP requests to the plugin sidecar, so it is important to add basic access controls instead of relying on defaults.


The minimal change we need to make in plugin.yaml

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: print
spec:
  sidecar:
    container:
      #...
      securityContext:
        #...
        runAsUser: 1000  # A user is now required to access the token from the sidecar container; we can no longer use a non-root user.
      #...


By default (without extra customization):


The Argo Workflow Controller mounts the /var/run/argo volume into the agent-pod (which hosts the plugin sidecar). This volume contains a token.


Additionally:

Inside the plugin, you need to read the token from /var/run/argo/token and compare it with the token from the request header.

with open("/var/run/argo/token") as f:
  token = f.read().strip()
def do_POST(self):
  if self.headers.get("Authorization") != "Bearer " + token:
    self.forbidden()              


Mount the ServiceAccount to the agent pod

Kubernetes uses a default-deny model for API access. If your plugin needs to call the Kubernetes API (for example, to list pods or create resources), you must mount a ServiceAccount into the sidecar container and grant it the minimal required RBAC permissions.


Let’s walk through a simple example: writing a plugin that calls the Kubernetes API and prints the list of pods in the namespace specified in the request.


Here is an example server implementation.

from fastapi import FastAPI, Request, HTTPException
import uvicorn

import os

from kubernetes import config, client

app = FastAPI()

config.load_incluster_config()
k8s = client.CoreV1Api()

def list_pods_in_namespace(namespace: str):
    # This is where we query the Kubernetes API to list pods.
    # The ServiceAccount must be mounted into the pod and have the required RBAC permissions
    # to list pods in the specified namespace.
    pods = k8s.list_namespaced_pod(namespace=namespace)
    return [p.metadata.name for p in pods.items]

@app.post("/api/v1/template.execute")
async def execute(request: Request):
    data = await request.json()

    plugin_request = data.get("template", {}).get("plugin", {}).get("list-pods-plugin")
    if plugin_request is None:
        return None
    if plugin_request.get('namespace') is None:
        raise HTTPException(400, 'namespace parameter is missing')
    namespace = plugin_request.get('namespace')

    print(f'namespace: {namespace}')

    pod_list = list_pods_in_namespace(namespace)

    return {
        "node": {
            "phase": "Succeeded",
            "outputs": {
                "parameters": [{"name": "result", "value":  f'pods in ns {namespace}: {",".join(pod_list)}'}]
            }
        }
    }


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8082)


For mounting the ServiceAccount into the executor plugin sidecar, automountServiceAccountToken must be set to true.


Here is an example YAML showing how to mount the ServiceAccount token into the executor plugin sidecar:

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: print
spec:
  sidecar:
    automountServiceAccountToken: true 
    container:
      image: list-pods-plugin:latest
      name: list-pods-plugin
      ports:
        - containerPort: 8082
      securityContext:
        runAsNonRoot: false
        runAsUser: 1001
      resources:
        requests:
          memory: "64Mi"
          cpu: "250m"
        limits:
          memory: "128Mi"
          cpu: "500m"


Reminder: to obtain the ConfigMap required to add the plugin to the cluster, you need to run the following command.

 argo executor-plugin build .


Afterwards, apply the created ConfigMap to the cluster.


The executor plugin then runs with its own dedicated ServiceAccount, separate from the one used by the Argo Workflow. This approach improves security by allowing fine-grained control over the permissions granted to the executor plugin. The ServiceAccount name is predefined and cannot be customized. It is always set to: <pluginName>-executor-plugin (for example, list-pods-plugin-executor-plugin in this case).


With the token mounted, the executor plugin can authenticate with the Kubernetes API and perform actions allowed by its ServiceAccount.

RBAC details are not included here. For everything needed to try out the plugins, see my demo repo with instructions: here.

Handling Multiple Plugins

When multiple Executor Plugins are installed in the cluster (for example, from different teams), the Argo Workflow Controller calls them sequentially until one handles the task.


Always check the plugin name in your server:

@app.post("/api/v1/template.execute")
async def execute(request: Request):
    data = await request.json()
    
    # Check if this is OUR plugin
    my_plugin_request = data.get("template", {}).get("plugin", {}).get("print-message-plugin")
    if my_plugin_request is None:
        return None  # ← Pass to the next plugin
    
    # Execute logic only for our plugin name
    # ...

Why this matters:

See the discussion here.​

Final Example: One Pod Instead of Three

Now that the plugin is properly installed and configured, let's run the same steps workflow - but using our plugin.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-plugin-
spec:
  entrypoint: hello-hello-hello

  templates:
    - name: hello-hello-hello
      steps:
        - - name: hello1
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello1"
        - - name: hello2a
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2a"
          - name: hello2b
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2b"

    - name: print-message
      inputs:
        parameters:
          - name: message
      plugin:
        print-message-plugin:
          args: ["{{inputs.parameters.message}}"]


Before (standard template: container):

kubectl -n argo get po
NAME READY STATUS RESTARTS AGE
steps-6zjdt-print-message-2278363560/2Completed060s
steps-6zjdt-print-message-35586282680/2Completed050s
steps-6zjdt-print-message-36089611250/2Completed050s


After (using template: plugin):

kubectl -n argo get po
NAME READY STATUS RESTARTS AGE
steps-plugin-7fkgk-1340600742-agent 4/4Running019s

The same workflow now runs all three steps on a single reusable agent pod, reducing pod creation overhead and scheduler load.

HTTP Template (Bonus Alternative)

There's also the http template - a simpler alternative to full plugins when you just need to make an HTTP request and use the full response.

Works exactly like plugins but without custom server code:

When NOT to Use Executor Plugins

Real-World Use Cases from Community

Conclusion

Executor Plugins powerfully extend Argo Workflows without controller forks, enabling seamless integration with external systems via HTTP and eliminating pod fluctuations for lightweight tasks.


Featured image: Photo by Ian Taylor on Unsplash