Keywords: Big Data, Data Scheduling, Workflow, Batch Stop

Introduction

After experimenting with Apache DolphinScheduler and using it in a real project, I ran into a problem: too many workflows got stuck in a “running” state without making progress. Manually stopping them was painfully slow and exhausting. Here’s how I solved it.

Background

The Symptoms

A large number of workflow instances appeared to be “running,” but were not executing anything. They occupied their task group slots and blocked other jobs from running. When too many accumulated, manually stopping them one by one became impractical.

Key Considerations

Before killing tasks in bulk, I had to consider two important factors:

Solution

1. Using DolphinScheduler’s API

DolphinScheduler provides REST APIs to create, query, and stop workflows. By leveraging these APIs, we can automate batch termination, instead of relying on manual clicks.

2. Python Script Automation

To streamline the process, I wrote a simple Python script.

Script name: dolpschedule-kill.py

# -*- coding: utf-8 -*-
import requests
# Note: This environment only supports Python 2.7, so the script is not Python 3.

# Base API endpoint
BASE_URL = "http://XXX.XXX.XXX.XXX:12345/dolphinscheduler"

# Project code (can be found via DB query or in Project Management -> Project List)
PROJECT_CODE = "12194663850176"

# Token (created in Security Center -> Token Management)
token = "6bff15e17667d95fdffceda08a19cc6c"

# 1. Fetch running workflows
def get_running_tasks(token, pageNo=1, pageSize=10):
    headers = {"token": token}
    task_list_url = "{0}/projects/{1}/process-instances?pageNo={2}&pageSize={3}&stateType=RUNNING_EXECUTION".format(
        BASE_URL, PROJECT_CODE, pageNo, pageSize)
    resp = requests.get(task_list_url, headers=headers)
    return [item['id'] for item in resp.json()['data']['totalList']]

# 2. Stop workflows in bulk
def batch_stop_tasks(token, task_ids):
    headers = {"token": token}
    for task_id in task_ids:
        stop_url = "{0}/projects/{1}/executors/execute?processInstanceId={2}&executeType=STOP".format(
            BASE_URL, PROJECT_CODE, task_id)
        resp = requests.post(stop_url, headers=headers)
        print("Task {0} stopped: {1}".format(task_id, resp.status_code))

# Main flow
if __name__ == "__main__":
    # Kill up to 100 tasks per execution
    running_tasks_ids = get_running_tasks(token, pageNo=1, pageSize=100)
    print("Found {0} running tasks".format(len(running_tasks_ids)))
    batch_stop_tasks(token, running_tasks_ids)

3. Running the Script

python dolpschedule-kill.py

4. Results

Each stopped task returned 200, confirming success.

Final Outcome

With this script, I was able to batch kill all deadlocked workflows.

That said, sometimes individual task instances (not workflows) remain stuck. These cannot be terminated via the API. In those cases, you’ll need to manually fix them in the backend database. For reference, check out my earlier article: 6 High-Frequency SQL Operation Tips for DolphinScheduler.