Apache DolphinScheduler has already been deployed on a virtual machine.

Next, we want to try creating a Flink task node in DolphinScheduler and use Flink to consume data from Kafka.

Apache DolphinScheduler is deployed in standalone mode.

For detailed installation steps, please refer to the official documentation:

DolphinScheduler | Documentation Center

https://dolphinscheduler.apache.org/zh-cn/docs/3.3.2/guide/installation/standalone

Prerequisites

The following components have already been installed:

It is recommended to download and install binary packagesdirectly on the virtual machine. Installing via system package managers may introduce uncontrollable dependencies.

The downloaded binary packages are shown below:

1. Edit environment variables

sudo vim ~/.bashrc

Add the Flink installation path:

2. Make the configuration effective

# Reload environment variables
source ~/.bashrc

# Verify Flink environment variable
echo $Flink_HOME

Since this setup runs inside a virtual machine, configuration changes are required so that services can be accessed from the host machine.

1. Modify Kafka configuration

Navigate to the config directory under the Kafka installation path and edit server.properties.

The listeners and advertised.listeners settings must be modified so that Kafka can be accessed externally. Otherwise, Kafka will default to localhost, which may cause connection failures.

broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
# Replace with your VM IP address
advertised.listeners=PLAINTEXT://192.168.146.132:9092

Go to the conf directory under the Flink installation path and edit flink-conf.yaml.

Replace all localhost addresses with 0.0.0.0 so the Flink Web UI can be accessed externally.

Additionally, adjust JobManager and TaskManager memory settings.

jobmanager.rpc.address: 0.0.0.0
jobmanager.bind-host: 0.0.0.0
jobmanager.cpu.cores: 1
jobmanager.memory.process.size: 1600m

taskmanager.bind-host: 0.0.0.0
taskmanager.host: 0.0.0.0
taskmanager.memory.process.size: 2048m
taskmanager.cpu.cores: 1


3. Modify Apache DolphinScheduler configuration

From the dolphinscheduler-daemon.sh startup script, we can see that DolphinScheduler loads environment variables from:

bin/env/dolphinscheduler_env.sh

View dolphinscheduler-daemon.sh:

Edit dolphinscheduler_env.sh and add Java and Flink paths:

# Replace with your actual Java and Flink paths
export JAVA_HOME=/data/jdk-11.0.29
export Flink_HOME=/data/Flink-1.18.1

Disable Firewall and Start Services

Start all required services, including ZooKeeper, Kafka, Flink, and Apache DolphinScheduler.

# Disable firewall
sudo systemctl stop firewalld

# Start Flink cluster
bin/start-cluster.sh

# Start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &

# Start Kafka broker
bin/kafka-server-start.sh config/server.properties &

# Create Kafka topic
bin/kafka-topics.sh --create --topic test \
  --bootstrap-server localhost:9092 \
  --partitions 1 --replication-factor 1

# Produce messages
bin/kafka-console-producer.sh --topic test \
  --bootstrap-server localhost:9092

# Consume messages
bin/kafka-console-consumer.sh --topic test \
  --from-beginning --bootstrap-server localhost:9092

# Start DolphinScheduler Standalone Server
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

Verification

Access the Flink dashboard at:

http://<VM-IP>:8081

2. Verify DolphinScheduler Web UI

Access DolphinScheduler at:

http://<VM-IP>:12345/dolphinscheduler/ui

Default credentials:

Sample Implementation

This example demonstrates how Flink consumes data from Kafka, packages the job, uploads it to DolphinScheduler, and executes it as a Flink task.

1. Sample Code

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>Flink-Kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <Flink.version>1.18.1</Flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <Kafka.version>3.6.0</Kafka.version>
    </properties>
 
    <dependencies>
        <!-- Flink core dependency -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-streaming-java</artifactId>
            <version>${Flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-clients</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Connector Base Dependency -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-base</artifactId>
            <version>${Flink.version}</version>
        </dependency>
 
        <!-- Kafka Connector (Key Change) -->
        <dependency>
            <groupId>org.apache.Flink</groupId>
            <artifactId>Flink-connector-Kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Kafka</groupId>
            <artifactId>Kafka-clients</artifactId>
            <version>${Kafka.version}</version>
        </dependency>
 
        <!-- Logging Dependency -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
 
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>apache-releases</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.Flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.java.tuple.Tuple2;
import org.apache.Flink.api.java.utils.ParameterTool;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.datastream.DataStream;
import org.apache.Flink.streaming.api.functions.ProcessFunction;
import org.apache.Flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.Flink.util.Collector;
import org.apache.Flink.streaming.connectors.Kafka.FlinkKafkaConsumer;
import org.apache.Flink.api.common.serialization.SimpleStringSchema;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.common.serialization.StringDeserializer;
 
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
 
 
public class FlinkKafkaConsumerExample {
    private static volatile int messageCount = 0;
    private static volatile boolean shouldStop = false;
    public static void main(String[] args) throws Exception {
        // Set the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // Kafka configuration
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092"); // Kafka broker 地址
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // Consumer group
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        // Create Kafka Consumer
        FlinkKafkaConsumer<String> KafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        KafkaConsumer.setStartFromEarliest(); // Consume from the earliest messages
        DataStream<String> stream = env.addSource(KafkaConsumer);
 
        // Process data: tokenization and counting
        DataStream<Tuple2<String, Integer>> counts = stream
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);
 
 
        counts.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) {
                System.out.println(value);
                messageCount++;
 
                // Check whether the stop condition is met
                if (messageCount >= 2 && !shouldStop) {
                    System.out.println("Processed 2 messages, stopping job.");
                    shouldStop = true; // Set a flag to indicate that the job should stop
                }
            }
        });
 
        // Execute the job and obtain JobClient
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // Start the job and obtain JobClient
                org.apache.Flink.core.execution.JobClient jobClient = env.executeAsync("Flink Kafka WordCount");
                System.out.println("Job ID: " + jobClient.getJobID());
 
                // Monitor the condition and cancel the job
                while (!shouldStop) {
                    Thread.sleep(100); // Check every 100 milliseconds
                }
 
                // Cancel the job when the stop condition is met
                if (shouldStop) {
                    System.out.println("Cancelling the job...");
                    jobClient.cancel().get(); // Cancel the job
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
 
        // Wait for the job to finish in the main thread
        future.join(); // Wait for the job to finish
            }
 
    // Tokenizer Class for converting input strings into words
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
 
}

2. Package and Upload to DolphinScheduler

Start the Kafka producer in the virtual machine and send messages.

Flink successfully consumes and processes the Kafka data.