A practical spectrum from scripted tasks to goal-seeking systems (LoA-1 → LoA-3).

If Agents & Process Automation 101 was the why and Levels of Autonomy (LoA) for Enterprise Automation was the dial, this is the wiring you can ship, now tightened to match how you’ll actually deploy: one generic agent, SOP packaged with each model version, and the caller chooses the version (e.g., @Champion/@Challenger or separate endpoints). Tools are Unity Catalog (UC) functions only. Policy lives in YAML. We finish with a standardised Assurance Gate and real telemetry.

This article is meant for a more technical audience compared to the previous ones; architects and engineers who want a blueprint they can ship easily.

TL;DR

Store SOPs in a UC Volume and bundle one with each model version. Keep it contract‑first: planner limits, allow‑listed tools, memory, CEL gates, judges.

name: "vendor_onboarding"
goal: "Onboard vendor if compliant"
loa: 3

planner:
  model: "databricks-gpt-oss-20b"    # or "databricks-gpt-oss-120b"
  base_url: "https://<workspace-host>/serving-endpoints/databricks-gpt-oss-20b/invocations"
  temperature: 0.2
  max_steps: 6
  max_reflections: 2
  never_guess: true
  system_prompt: |
    You are a control-plane planner. At each turn propose ONE next step as STRICT JSON:
    {"use": <tool_name>, "args": {...}, "stop": true|false, "reason": "<why>"}
    Use ONLY the allowed tools and their schemas. If unsafe/unknown, set stop=true.
    If the last tool failed:
      • Retry only when idempotent and policy allows; otherwise propose an alternative or stop ("HITL").

tools:
  - name: LookupCompany
    kind: sql_function
    uc_fn: "procure.tools.lookup_company"
    description: "Resolve a company by legal name; returns {id,legal_name,country}."
    input_schema: { type: object, properties: { name: { type: string } }, required: [name] }

  - name: SanctionsCheck
    kind: model_function
    uc_model: "procure.tools.sanctions_check"
    uc_endpoint: "https://<workspace>/serving-endpoints/sanctions_check/invocations"
    description: "KYC screen; returns {sanctions:'clear'|'flag', evidence:[uri...]}"
    input_schema: { type: object, properties: { entity_id: { type: string } }, required: [entity_id] }

  - name: CreateVendor
    kind: model_function
    uc_model: "procure.tools.create_vendor"
    uc_endpoint: "https://<workspace>/serving-endpoints/create_vendor/invocations"
    description: "Create vendor in ERP; args: {company:{...}}; returns created record."
    input_schema: { type: object, properties: { company: { type: object } }, required: [company] }

guardrails:
  uc_functions:
    - "procure.governance.validate_vendor"   # structural/PII check
  thresholds:
    amount_max: 1000
  gates_cel:
    - if: "input.amount > thresholds.amount_max"   then: "require_hitl"
    - if: "size(ctx.SanctionsCheck.evidence) == 0" then: "fail_and_retry"

failure:
  default:
    max_retries: 1
    backoff_ms: [250]
    retryable: ["timeout", "rate limit", "429", "temporarily unavailable", "503"]
    non_retryable: ["permission", "invalid argument", "validation", "not found"]
  per_tool:
    CreateVendor:
      max_retries: 0
    LookupCompany:
      max_retries: 2
      backoff_ms: [250, 500]

judging:
  pass_threshold: 0.82
  rubric: ["policy_adherence", "source_citations", "numerical_consistency"]

observability:
  traces: "mlflow+otel"
  audit: "cloudevents.v1"

Why this works: UC‑governed tools + YAML policy + a single generic agent—you get EXECUTE/endpoint perms, audit/lineage by default; thresholds move via PR, not code; and swapping SOPs is a version bump, not a new code path.


Agent core (LangChain tool‑calling; Assurance Gate at the end)

Here’s the on-rails core the rest hangs on: a single MLflow LangChain model, built from an SOP (YAML) bundled with the version, drives a LangChain tool-calling agent that’s constrained to an allow-list of Unity Catalog tools. Inputs to each tool are typed (JSON Schema → Pydantic), and the loop runs plan → act → observe, with no off-catalog calls. Before anything commits, an Assurance Gate fires: UC structural/PII checks, then CEL policy thresholds from the SOP, then MLflow judges (groundedness/relevance/safety). Pass and it auto-finalizes; miss and it routes to HITL.

# agent/sop_agent_langchain_chain.py
# LangChain chain that:
# 1) builds a tool-calling AgentExecutor from SOP
# 2) runs it
# 3) applies the Assurance Gate (UC guardrail fn → CEL → MLflow judges)

from __future__ import annotations
import os, json, yaml, time, requests
from typing import Any, Dict, List
from collections import defaultdict
from databricks import sql
from jsonschema import validate as js_validate
from celpy import Environment as CELEnv

from pydantic import BaseModel, create_model
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import StructuredTool
from langchain_core.runnables import RunnableLambda
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI

from mlflow.genai.judges import is_grounded, is_safe, is_context_relevant

# ───────────── UC helpers ─────────────
def _sql_connect():
    return sql.connect(
        server_hostname=os.getenv("DB_HOST"),
        http_path=os.getenv("DB_WAREHOUSE"),
        access_token=os.getenv("DB_TOKEN"),
    )

def call_uc_sql_function(fn_path: str, args: Dict[str, Any]) -> Any:
    with _sql_connect() as c:
        cur = c.cursor()
        placeholders = ",".join(["?"] * len(args))
        cur.execute(f"SELECT {fn_path}({placeholders})", tuple(args.values()))
        return cur.fetchone()[0]

def call_uc_model(endpoint_url: str, inputs: Dict[str, Any]) -> Any:
    r = requests.post(
        endpoint_url,
        headers={"Authorization": f"Bearer {os.getenv('DATABRICKS_TOKEN')}",
                 "Content-Type": "application/json"},
        json={"inputs": inputs}, timeout=60)
    r.raise_for_status()
    return r.json()

def uc_validate_payload(fn_path: str, payload_json: str) -> bool:
    with _sql_connect() as c:
        cur = c.cursor()
        cur.execute(f"SELECT {fn_path}(?)", (payload_json,))
        return bool(cur.fetchone()[0])

# ───────────── CEL + judges ─────────────
def cel_decision(gates: List[dict], cel_ctx: Dict[str, Any]) -> str:
    env = CELEnv()
    for g in gates:
        if env.compile(g["if"]).program().evaluate(cel_ctx):
            return g["then"]
    return "allow"

def judge_mlflow(proposal: Dict[str, Any], pass_q: float,
                 request: str, response: str, context: Any) -> Dict[str, Any]:
    def yes(x):
        v = getattr(getattr(x, "feedback", None), "value", "no")
        return 1.0 if str(v).lower() == "yes" else 0.0
    parts = {
        "groundedness": yes(is_grounded(request=request, response=response, context=context)),
        "context_relevance": yes(is_context_relevant(request=request, context=context)),
        "safety": yes(is_safe(content=response)),
    }
    score = sum(parts.values()) / max(1, len(parts))
    return {"score": score, "verdict": "pass" if score >= float(pass_q) else "fail"}

# ───────────── JSON Schema → Pydantic for tool signatures ─────────────
_JSON_TO_PYD = {"string": str, "number": float, "integer": int, "boolean": bool, "object": Dict[str, Any], "array": List[Any]}

def _schema_to_pydantic(name: str, schema: Dict[str, Any]) -> BaseModel:
    props, req = schema.get("properties", {}) or {}, set(schema.get("required", []) or [])
    fields = {k: (_JSON_TO_PYD.get(spec.get("type", "object"), Dict[str, Any]), (... if k in req else None))
              for k, spec in props.items()} or {"payload": (Dict[str, Any], ...)}
    return create_model(name, **fields)

# ───────────── Tools with SOP-driven retry/backoff + error observation ─────────────
def _make_tool(spec: Dict[str, Any],
               failure_cfg: Dict[str, Any],
               attempts: defaultdict) -> StructuredTool:
    ArgsModel = _schema_to_pydantic(f"{spec['name']}Args", spec["input_schema"])

    per_tool = (failure_cfg.get("per_tool", {}) or {}).get(spec["name"], {})
    default  = failure_cfg.get("default", {}) or {}
    max_retries = int(per_tool.get("max_retries", default.get("max_retries", 0)))
    backoff_ms  = list(per_tool.get("backoff_ms", default.get("backoff_ms", [])))
    retryable   = [s.lower() for s in default.get("retryable", [])]
    nonretry    = [s.lower() for s in default.get("non_retryable", [])]

    def _run(**kwargs):
        js_validate(kwargs, spec["input_schema"])
        attempt = 0
        while True:
            try:
                out = call_uc_sql_function(spec["uc_fn"], kwargs) if spec["kind"] == "sql_function" \
                      else call_uc_model(spec["uc_endpoint"], kwargs)
                attempts[(spec["name"], "ok")] += 1
                return out if isinstance(out, (dict, list, str, int, float, bool)) \
                           else json.loads(json.dumps(out, default=str))
            except Exception as e:
                msg = str(e)
                low = msg.lower()
                attempts[(spec["name"], "err")] += 1

                # 1) hard stop (non-retryable) → return error observation
                if any(s in low for s in nonretry):
                    return {"_error": {
                        "tool": spec["name"], "message": msg[:500],
                        "retryable": False, "attempts": attempt, "policy_max_retries": max_retries
                    }}

                # 2) retries exhausted → return error observation
                if attempt >= max_retries:
                    return {"_error": {
                        "tool": spec["name"], "message": msg[:500],
                        "retryable": any(s in low for s in retryable), "attempts": attempt,
                        "policy_max_retries": max_retries
                    }}

                # 3) looks retryable? backoff a bit and try again
                if retryable and any(s in low for s in retryable):
                    delay = backoff_ms[attempt] if attempt < len(backoff_ms) else 250
                    time.sleep(min(delay, 1000) / 1000.0)
                    attempt += 1
                    continue

                # 4) not clearly retryable → return error observation
                return {"_error": {
                    "tool": spec["name"], "message": msg[:500],
                    "retryable": False, "attempts": attempt, "policy_max_retries": max_retries
                }}

    return StructuredTool.from_function(
        name=spec["name"],
        description=spec.get("description", f"UC tool {spec['name']}"),
        args_schema=ArgsModel,
        func=_run,
        return_direct=False,
    )


def _build_agent_executor(sop: Dict[str, Any]) -> AgentExecutor:
    # tools
    attempts = defaultdict(int)
    failure_cfg = sop.get("failure", {"default": {"max_retries": 0}})
    tools = [_make_tool(t, failure_cfg, attempts) for t in sop["tools"]]
    # planner LLM (Databricks GPT-OSS via OpenAI-compatible endpoint)
    llm = ChatOpenAI(
        model=sop["planner"]["model"],
        base_url=sop["planner"]["base_url"],
        api_key=os.getenv("DATABRICKS_TOKEN") or os.getenv("LLM_API_KEY"),
        temperature=float(sop["planner"].get("temperature", 0.2)),
    )
    prompt = ChatPromptTemplate.from_messages([
        ("system", sop["planner"]["system_prompt"].strip()),
        ("human", "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ])
    agent = create_tool_calling_agent(llm, tools, prompt)
    return AgentExecutor(
        agent=agent,
        tools=tools,
        max_iterations=int(sop["planner"].get("max_steps", 6)),
        return_intermediate_steps=True,
        handle_parsing_errors=True,
        verbose=False,
    )

# ───────────── Chain factory (Runner) ─────────────
def build_chain_from_sop(sop_path: str):
    sop = yaml.safe_load(open(sop_path))
    executor = _build_agent_executor(sop)

    def _invoke_agent(user_input: Dict[str, Any]):
        # 1) invoke agent
        block = {
            "goal": sop["goal"],
            "constraints": {"never_guess": bool(sop["planner"].get("never_guess", True)),
                            "max_steps": int(sop["planner"].get("max_steps", 6))},
            "input": dict(user_input),
            "tools": [t["name"] for t in sop["tools"]],
        }
        res = executor.invoke({"input": json.dumps(block, ensure_ascii=False)})

        # 2) collect outputs (latest per tool)
        ctx_map: Dict[str, Any] = {}
        for action, output in res.get("intermediate_steps", []):
            name = getattr(action, "tool", "tool")
            try:
                ctx_map[name] = json.loads(output) if isinstance(output, str) else output
            except Exception:
                ctx_map[name] = output

        # 3) Assurance Gate
        proposal = {"input": dict(user_input), "ctx": ctx_map}
        payload = json.dumps(proposal)

        for fn in sop["guardrails"]["uc_functions"]:
            if not uc_validate_payload(fn, payload):
                return {"status": "FAILED_GUARDRAIL", "reason": f"UC guardrail failed: {fn}"}

        decision = cel_decision(
            sop["guardrails"]["gates_cel"],
            {"input": user_input, "ctx": ctx_map, "thresholds": sop["guardrails"]["thresholds"]}
        )

        judgment = judge_mlflow(
            proposal=proposal,
            pass_q=float(sop["judging"]["pass_threshold"]),
            request=sop["goal"],
            response=payload,
            context=ctx_map.get("SanctionsCheck", {}).get("evidence", []),
        )

        if decision in ("fail_and_retry", "require_hitl") or judgment["verdict"] == "fail":
            return {"status": "REQUIRES_HITL", "confidence": judgment["score"], "result": ctx_map}

        return {"status": "COMPLETED", "confidence": judgment["score"], "result": ctx_map}

    # LangChain expects a Runnable; wrap the callable
    return RunnableLambda(_invoke_agent)

Wrap tools as UC functions

Keep reads in SQL; wrap actions as Model‑as‑Function (MLflow pyfunc + Serving). Grant EXECUTE/endpoint perms to the agent SP. Return JSON from SQL for predictable Python handling.

Read (JSON out):

CREATE OR REPLACE FUNCTION procure.tools.lookup_company(name STRING)
RETURNS STRING
RETURN to_json(named_struct(
  'id', id, 'legal_name', legal_name, 'country', country
))
FROM (
  SELECT id, legal_name, country
  FROM procure.refdata.companies
  WHERE lower(legal_name) = lower(name)
  LIMIT 1
);
GRANT EXECUTE ON FUNCTION procure.tools.lookup_company TO `spn:procurement-agent`;

Retrieval memory (policy recall):

CREATE OR REPLACE FUNCTION procure.knowledge.recall_policy(query STRING) -- replace with vector search in prod
RETURNS STRING
RETURN to_json(named_struct('passages',
  collect_list(named_struct('id', id, 'text', passage))
))
FROM (
  SELECT id, passage
  FROM procure.knowledge.policy_index
  WHERE contains(passage, query) 
  LIMIT 5
);
GRANT EXECUTE ON FUNCTION procure.knowledge.recall_policy TO `spn:procurement-agent`;

Guardrail (structural check):

CREATE OR REPLACE FUNCTION procure.governance.validate_vendor(payload STRING)
RETURNS BOOLEAN
RETURN from_json(
  payload,
  'input STRUCT<amount DOUBLE>, ctx STRUCT<SanctionsCheck STRUCT<evidence ARRAY<STRING>>>>'
) IS NOT NULL;
GRANT EXECUTE ON FUNCTION procure.governance.validate_vendor TO `spn:procurement-agent`;

Package, register, alias (Champion/Challenger)

Each version bundles its SOP; the caller chooses which to invoke.

# register/register_agent_langchain_uc.py
import os, hashlib, mlflow
from mlflow.tracking import MlflowClient
from agent.sop_agent_langchain_chain import build_chain_from_sop

mlflow.set_registry_uri("databricks-uc")  # UC-backed registry


def register_agent_version_from_uc_volume(
    sop_path: str,
    model_name: str = "procure.platform.agent",  # catalog.schema.model
    alias: str | None = None,
) -> str:

    lc_chain = build_chain_from_sop(sop_path)

    with mlflow.start_run() as run:
        info = mlflow.langchain.log_model(
            lc_model=lc_chain,
            name="agent",
            registered_model_name=model_name,
            pip_requirements=[
                "mlflow",
                "langchain",
                "langchain-openai",
                "langchain-core",
                "requests",
                "PyYAML",
                "databricks-sql-connector",
                "cel-python",
                "jsonschema",
            ],
            tags={"sop.path": sop_path},
            input_example={"company_name": "ACME Ltd", "amount": 750},
        )

    mv = info.model_version
    if alias:
        MlflowClient().set_registered_model_alias(model_name, alias, mv.version)

    print(f"Registered {model_name} v{mv.version}" + (f"  alias={alias}" if alias else ""))
    return mv.version

if __name__ == "__main__":
    register_agent_version_from_uc_volume(
        sop_path="/Volumes/procure/policies/agent_sops/vendor_onboarding.yml",
        alias="Champion"
    )
    # register_agent_version_from_uc_volume("/Volumes/procure/policies/agent_sops/vendor_onboarding.yml", alias="Challenger")

Serve & invoke (caller picks the version)

Option A — single endpoint that points to @Champion; flip alias during rollout.
Option B — two endpoints:..._champion and ..._challenger

import mlflow.langchain
lc = mlflow.langchain.load_model("models:/procure.platform.agent@Champion")
out = lc.invoke({"company_name": "ACME Ltd", "amount": 750})

You’ll get:

{
  "status": "COMPLETED",
  "result": { "...tool outputs..." },
  "confidence": 0.86,
  "plan": [
    { "executed": { "use": "LookupCompany",  "args": {"name": "ACME Ltd"}, "ok": true } },
    { "executed": { "use": "SanctionsCheck", "args": {"entity_id": "..."}, "ok": true } }
  ]
}

How this maps to LoA (and why it scales)

Ops you’ll actually run

SLOs: STP%, judge‑pass rate, approvals/100, p95 cost & latency, policy blocks, loop kills. Promote autonomy only when the data earns it.

Next step checklist

  1. Wrap reads/actions as UC functions; grant EXECUTE/endpoint perms to the agent SP.
  2. Draft the SOP (YAML) with planner limits, allow‑listed tools, memory, CEL gates, and judge threshold.
  3. Package the generic agent with the SOP as an artifact; register in UC; set @Champion.
  4. Create a serving endpoint for Champion (and optionally Challenger).
  5. Turn on MLflow Tracing → OTel; wire CloudEvents; watch SLOs.
  6. Iterate: update SOP → create new model version → move the alias. No code changes.

Ship it

Put the SOP in UC. Wrap the tools. Package the agent once. Register two versions with their own SOPs (@Champion, @Challenger). Point your endpoint at the alias, turn on MLflow→OTel, and watch STP%, judge‑pass, and p95 latency. If the numbers hold, expand scope; if they wobble, flip the alias back. Governed tools, explicit policy, small blast radius. This is production AI. Ship it.