Today, we focus on SeaTunnel’s self-developed data synchronization engine, called Zeta.

First, if you are using the Zeta engine, the first step is always to run the bin/seatunnel-cluster.sh script. This script starts the Zeta server.

Opening seatunnel-cluster.sh, we can see that it actually launches the main() method in seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java.

This is the core startup method of Zeta.

As shown in the code below:

public class SeaTunnelServer {

    public static void main(String[] args) throws CommandException {

        ServerCommandArgs serverCommandArgs =
                CommandLineUtils.parse(
                        args,
                        new ServerCommandArgs(),
                        EngineType.SEATUNNEL.getStarterShellName(),
                        acceptUnknownOptions: true
                );

        SeaTunnel.run(serverCommandArgs.buildCommand());
    }
}

We should first look at the ServerCommandArgsclass, which assembles the startup command based on command-line arguments. The entry point is serverCommandArgs.buildCommand().

@EqualsAndHashCode(callSuper = true)
@Data
public class ServerCommandArgs extends CommandArgs {

    @Parameter(
            names = {"-cn", "--cluster"},
            description = "The name of cluster"
    )
    private String clusterName;

    @Parameter(
            names = {"-d", "--daemon"},
            description = "The cluster daemon mode"
    )
    private boolean daemonMode = false;

    @Parameter(
            names = {"-r", "--role"},
            description =
                    "The cluster node role, default is master_and_worker, " +
                    "support master, worker, master_and_worker"
    )
    private String clusterRole;

    @Override
    public Command<?> buildCommand() {
        return new ServerExecuteCommand(this);
    }
}

Next, SeaTunnel.run() starts the SeaTunnelServer. The startup process can be summarized as follows:

Steps:

  1. Validate the current environment.
  2. Load SeaTunnel configuration.
  3. Set node role, including MasterWorker, and Master_And_Worker.
  4. Create a Hazelcast instance for cluster discovery, registration, and distributed data management.
@Override
public void execute() {
    // Validate environment
    checkEnvironment();

    // Load SeaTunnel configuration
    SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
    String clusterRole = this.serverCommandArgs.getClusterRole();

    // Set node role
    if (StringUtils.isNotBlank(clusterRole)) {
        if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
            seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
        } else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
            seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);
            // In Hazelcast lite node, it will not store IMap data.
            seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
        } else {
            throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
        }
    } else {
        seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
    }

    // Create Hazelcast instance for cluster discovery, registration, and distributed data management
    SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig, Thread.currentThread().getName());
}

Node Roles:

  1. Master Node:
  1. Worker Node:
  1. Hybrid Node (Legacy Architecture):

Returning to Hazelcast instance creation:

The core code is HazelcastInstanceFactory.newHazelcastInstance(), which creates the Hazelcast instance.

private static HazelcastInstanceImpl initializeHazelcastInstance(
        @NonNull SeaTunnelConfig seaTunnelConfig, String customInstanceName) {

    // Set the default async executor for Hazelcast InvocationFuture
    ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);

    // Check whether to enable metrics reporting
    boolean condition = checkTelemetryConfig(seaTunnelConfig);

    String instanceName = customInstanceName != null
            ? customInstanceName
            : HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig());

    // Create Hazelcast instance
    HazelcastInstanceImpl original = ((HazelcastInstanceProxy)
            HazelcastInstanceFactory.newHazelcastInstance(
                    seaTunnelConfig.getHazelcastConfig(),
                    instanceName,
                    new SeaTunnelNodeContext(seaTunnelConfig)))
            .getOriginal();

    // Initialize telemetry instance
    // Enable metrics reporting, including: JvmCollector, JobInfoDetail, ThreadPoolStatus, NodeMetrics, ClusterMetrics
    if (condition) {
        initTelemetryInstance(original.node);
    }

    return original;
}

The most important part here is new SeaTunnelNodeContext(seaTunnelConfig).

This returns a SeaTunnelNodeContext class, which extends Hazelcast’s DefaultNodeContext. During Hazelcast startup, it calls createNodeExtension(), which in this case is implemented in SeaTunnelNodeContext.

@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {

    private final SeaTunnelConfig seaTunnelConfig;

    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.seaTunnelConfig = seaTunnelConfig;
    }

    @Override
    public NodeExtension createNodeExtension(@NonNull Node node) {
        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
    }
}

Tracing into the node extension, we see that the Zeta engine is initialized here.

SeaTunnelServer implements a series of Hazelcast interfaces to listen for cluster state changes, including node initialization, member join/leave events, and distributed system operations.

Detailed operations:

  1. Node Initialization:
  2. Provides a complete startup process, ensuring all services are initialized correctly. Key methods: startMaster()and startWorker().
@Override
public void init(NodeEngine engine, Properties hzProperties) {
    this.nodeEngine = (NodeEngineImpl) engine;

    // Initialize class loader service
    classLoaderService = new DefaultClassLoaderService(
            seaTunnelConfig.getEngineConfig().isClassLoaderCacheMode(), nodeEngine);

    // Event service for processing and forwarding
    eventService = new EventService(nodeEngine);

    // Start Master / Worker capabilities
    if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
            == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
        startWorker();
        startMaster();
    } else if (EngineConfig.ClusterRole.WORKER.ordinal()
            == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
        startWorker();
    } else {
        startMaster();
    }

    // Health monitor
    seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());

    // Task log management
    if (seaTunnelConfig.getEngineConfig().getTelemetryConfig() != null
            && seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs() != null
            && seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs().isEnabled()) {
        taskLogManagerService = new TaskLogManagerService(
                seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs());
        taskLogManagerService.initClean();
    }

    // Jetty service: HTTP REST API, Web UI, job management, monitoring endpoints
    if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnabled()) {
        jettyService = new JettyService(nodeEngine, seaTunnelConfig);
        jettyService.createJettyServer();
    }

    // Fix Hadoop Statistics cleaner thread class-loader leak
    FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");
}

Master Node Startup (startMaster):Initializes CoordinatorService, CheckpointService, and monitoring service.

private void startMaster() {
    coordinatorService = new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    checkpointService = new CheckpointService(seaTunnelConfig.getEngineConfig().getCheckpointConfig());

    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);
}

Worker Node Startup (startWorker): Focuses on TaskExecutionService, which executes data processing tasks efficiently and reliably.

private void startWorker() {
    taskExecutionService = new TaskExecutionService(classLoaderService, nodeEngine, eventService);
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
}

Core process of TaskExecutionService.start():

Official description:

  1. Cluster Member Join/Leave:

Why memberAdded is empty:

memberRemoved responsibilities:

  1. Distributed System Operation Tracking:
@Override
public void populate(LiveOperations liveOperations) {
    // In SeaTunnelServer this implementation is empty,
    // indicating the current version has no special operations to track.
}

Local IDEA startup note:

Finally, visit localhost:8080 to check service status: