Status

Current state:

  •  discussion

  •  Accepted

  •  Rejected

Discussion threads: <Discussion link>

ISSUE: https://github.com/apache/seatunnel/issues/6672

Motivation  

At present, the Zeta engine does not distinguish between master nodes and worker nodes. All nodes can serve as master and worker nodes, and only one node in the entire cluster is marked as a master node at the same time. The master is responsible for receiving tasks submitted by clients and scheduling tasks, while also taking on the task running work.

When there are many tasks running in the cluster, the load on the master node can be very high, which may cause the master node to hang up and perform fault tolerance on the master node. Master node fault tolerance will further increase the responsibility of the cluster, especially newly selected master nodes have a greater chance of failure due to excessive load.

The goal of this proposal is to separate the roles of the master and worker nodes. The master node is only responsible for task scheduling, IMAP data storage, and access; Worker nodes are only responsible for the operation of tasks, and do not participate in the storage and access of IMAP data, nor do they participate in parameter elections.  

Core Update

Hazelcast Cluster Manager Change  

In Hazelcast, there is a type of node called a lite node, which is only used for computation in the Hazelcast cluster and does not store any IMAp data. This to some extent meets the requirement for our worker nodes to only calculate and not store IMAp data. However, both lite nodes and non lite nodes are parameterized and elected as master nodes. Due to Zeta's reliance on the master node identifier in its design to control the startup and initialization of the 'CoordinaterService' service, once a lite node is elected as the master node, the 'CoordinaterService' service will automatically fault tolerant to the new master node, resulting in the node being responsible for both job scheduling and task execution, and unable to achieve the design goals.

To achieve our goal, we need to modify the source code of hazelcast, rewrite the logic of node election, and prevent the election of the lite node parameter master.

By reading the hazelcast source code, you can select the master node in two places:

1.during cluster startup, use the 'doJoin' method in the Joiner interface to select the master node.

2.After the cluster is successfully started, the 'handleMastershipClaim' method is called when the node experiences fault tolerance and the master node is re elected.  

Change Election When Cluster Start

In org.apache.seatunnel.engine.server.SeaTunnelNodeContext, we can overwrite public Joiner createJoiner(Node node) method and add custom Joiner implement LiteNodeDropOutTcpIpJoiner.

public class SeaTunnelNodeContext extends DefaultNodeContext {

    private final SeaTunnelConfig seaTunnelConfig;

    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.seaTunnelConfig = seaTunnelConfig;
    }

    @Override
    public NodeExtension createNodeExtension(@NonNull Node node) {
        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
    }

    @Override
    public Joiner createJoiner(Node node) {
        JoinConfig join =
                getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
        join.verify();

        if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) {
            log.info("Using Multicast discovery");
            return new MulticastJoiner(node);
        } else if (join.getTcpIpConfig().isEnabled()) {
            log.info("Using TCP/IP discovery");
            return new LiteNodeDropOutTcpIpJoiner(node);
        } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
                || isAnyAliasedConfigEnabled(join)
                || join.isAutoDetectionEnabled()) {
            log.info("Using Discovery SPI");
            return new DiscoveryJoiner(node, node.discoveryService, usePublicAddress(join, node));
        }
        return null;
    }

    private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
        return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
    }

    private boolean usePublicAddress(JoinConfig join, Node node) {
        return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
                || allUsePublicAddress(
                        AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
    }
}

LiteNodeDropOutTcpIpJoiner extend TcpIpJoiner,Because most method in the TcpIpJoiner is private, So we copy the code from TcpIpJoiner to LiteNodeDropOutTcpIpJoiner and then update.

	private void joinViaPossibleMembers() {
        try {
            Collection<Address> possibleAddresses = getPossibleAddressesForInitialJoin();

            long maxJoinMillis = getMaxJoinMillis();
            long startTime = Clock.currentTimeMillis();

            while (shouldRetry() && (Clock.currentTimeMillis() - startTime < maxJoinMillis)) {
                tryJoinAddresses(possibleAddresses);

                if (clusterService.isJoined()) {
                    return;
                }

                // update for seatunnel, lite member can not become master node
                if (isAllBlacklisted(possibleAddresses) && !node.isLiteMember()) {
                    logger.fine(
                            "This node will assume master role since none of the possible members accepted join request.");
                    clusterJoinManager.setThisMemberAsMaster();
                    return;
                }

                if (tryClaimMastership(possibleAddresses)) {
                    return;
                }

                clusterService.setMasterAddressToJoin(null);
            }
        } catch (Throwable t) {
            logger.severe(t);
        }
    }

	private boolean isThisNodeMasterCandidate(Collection<Address> addresses) {
        // update for seatunnel, lite node can not become master node.
        if (node.isLiteMember()) {
            return false;
        }
        int thisHashCode = node.getThisAddress().hashCode();
        for (Address address : addresses) {
            if (isBlacklisted(address)) {
                continue;
            }
            if (node.getServer().getConnectionManager(MEMBER).get(address) != null
                    && node.getClusterService().getMember(address) != null
                    && !node.getClusterService().getMember(address).isLiteMember()) {
                LocalAddressRegistry addressRegistry = node.getLocalAddressRegistry();
                UUID memberUuid = addressRegistry.uuidOf(address);
                if (memberUuid != null) {
                    Address primaryAddress = addressRegistry.getPrimaryAddress(memberUuid);
                    if (primaryAddress != null) {
                        if (thisHashCode > primaryAddress.hashCode()) {
                            return false;
                        }
                    }
                }
            }
        }
        return true;
    }

Master Election When Master Node Shutdown

Due to our inability to integrate this feature into the upstream of the hazelcast project, we will adopt the same processing method as seatunel-config: exclude the original classes in hazelcast through the shade package, and then add the classes we have rewritten to the new jar package.

1.Add module: seatunnel-hazelcast-base

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.apache.seatunnel</groupId>
        <artifactId>seatunnel-hazelcast</artifactId>
        <version>${revision}</version>
    </parent>

    <artifactId>seatunnel-hazelcast-base</artifactId>

    <properties>
        <!--  SeaTunnel Engine use     -->
        <hazelcast.version>5.1</hazelcast.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>${hazelcast.version}</version>
        </dependency>
    </dependencies>

    <build>

        <finalName>${project.artifactId}-${project.version}</finalName>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>
                    <minimizeJar>true</minimizeJar>
                    <createSourcesJar>true</createSourcesJar>
                    <shadeSourcesContent>true</shadeSourcesContent>
                    <shadedArtifactAttached>false</shadedArtifactAttached>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>com.typesafe:config</artifact>
                            <includes>
                                <include>**</include>
                            </includes>
                            <excludes>
                                <exclude>META-INF/MANIFEST.MF</exclude>
                                <exclude>META-INF/NOTICE</exclude>
                                <exclude>com/hazelcast/internal/cluster/impl/MembershipManager.class</exclude>
                                <exclude>com/hazelcast/internal/cluster/impl/MemberMap.class</exclude>
                                <exclude>com/hazelcast/internal/cluster/impl/ClusterServiceImpl.class</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer" />
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <phase>package</phase>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>compile</id>
                        <goals>
                            <goal>attach-artifact</goal>
                        </goals>
                        <phase>package</phase>
                        <configuration>
                            <artifacts>
                                <artifact>
                                    <file>${basedir}/target/${project.artifactId}-${project.version}.jar</file>
                                    <type>jar</type>
                                    <classifier>optional</classifier>
                                </artifact>
                            </artifacts>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>

2.Add module: seatunnel-hazelcast-shade


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.apache.seatunnel</groupId>
        <artifactId>seatunnel-hazelcast</artifactId>
        <version>${revision}</version>
    </parent>
    <artifactId>seatunnel-hazelcast-shade</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>seatunnel-hazelcast-base</artifactId>
            <version>${project.version}</version>
        </dependency>
    </dependencies>
    <build>

        <finalName>${project.artifactId}-${project.version}</finalName>

        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>compile</id>
                        <goals>
                            <goal>attach-artifact</goal>
                        </goals>
                        <phase>package</phase>
                        <configuration>
                            <artifacts>
                                <artifact>
                                    <file>${basedir}/target/${project.artifactId}-${project.version}.jar</file>
                                    <type>jar</type>
                                    <classifier>optional</classifier>
                                </artifact>
                            </artifacts>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
</project>

3.Rewrite class com.hazelcast.internal.cluster.impl.ClusterServiceImpl

    // called under cluster service lock
    // mastership is accepted when all members before the candidate is suspected or is lite node
    private boolean shouldAcceptMastership(MemberMap memberMap, MemberImpl candidate) {
        assert lock.isHeldByCurrentThread() : "Called without holding cluster service lock!";
        for (MemberImpl member : memberMap.headMemberSet(candidate, false)) {
            // update for seatunnel, lite member can not become master node
            if (!member.isLiteMember() && !membershipManager.isMemberSuspected(member)) {
                if (logger.isFineEnabled()) {
                    logger.fine(
                            "Should not accept mastership claim of "
                                    + candidate
                                    + ", because "
                                    + member
                                    + " is not suspected at the moment and is before than "
                                    + candidate
                                    + " in the member list.");
                }

                return false;
            }
        }
        return true;
    }

4.Rewrite class com.hazelcast.internal.cluster.impl.MemberMap

    Set<MemberImpl> tailMemberSet(MemberImpl member, boolean inclusive) {
        ensureMemberExist(member);

        Set<MemberImpl> result = new LinkedHashSet<>();
        boolean found = false;
        for (MemberImpl m : members) {
            // update for seatunnel
            // all lite member need add to new cluster
            if (m.isLiteMember()) {
                result.add(m);
                continue;
            }

            if (!found && m.equals(member)) {
                found = true;
                if (inclusive) {
                    result.add(m);
                }
                continue;
            }

            if (found) {
                result.add(m);
            }
        }

        assert found : member + " should have been found!";

        return result;
    }

5.Rewrite class com.hazelcast.internal.cluster.impl.MembershipManager

    // add for seatunnel
    public boolean allNodeIsLite() {
        MemberMap memberMap = memberMapRef.get();
        for (MemberImpl member : memberMap.getMembers()) {
            if (!member.isLiteMember() && !suspectedMembers.contains(member)) {
                return false;
            }
        }
        return true;
    }

    void suspectMember(MemberImpl suspectedMember, String reason, boolean closeConnection) {
        assert !suspectedMember.equals(getLocalMember()) : "Cannot suspect from myself!";
        assert !suspectedMember.localMember() : "Cannot be local member";

        final MemberMap localMemberMap;
        final Set<MemberImpl> membersToAsk;

        clusterServiceLock.lock();
        try {
            if (!clusterService.isJoined()) {
                if (logger.isFineEnabled()) {
                    logger.fine(
                            "Cannot handle suspect of "
                                    + suspectedMember
                                    + " because this node is not joined...");
                }

                return;
            }

            ClusterJoinManager clusterJoinManager = clusterService.getClusterJoinManager();
            if ((clusterService.isMaster() && !clusterJoinManager.isMastershipClaimInProgress())) {
                removeMember(suspectedMember, reason, closeConnection);
                return;
            }

            if (!addSuspectedMember(suspectedMember, reason, closeConnection)) {
                return;
            }

            // update for seatunnel
            if (node.isLiteMember() && allNodeIsLite()) {
                logger.severe("All node is lite node, shutdown this cluster");
                node.shutdown(true);
            }

            if (!tryStartMastershipClaim()) {
                return;
            }

            localMemberMap = getMemberMap();
            membersToAsk = collectMembersToAsk(localMemberMap);
            logger.info(
                    "Local "
                            + localMemberMap.toMembersView()
                            + " with suspected members: "
                            + suspectedMembers
                            + " and initial addresses to ask: "
                            + membersToAsk);
        } finally {
            clusterServiceLock.unlock();
        }

        ExecutorService executor =
                nodeEngine.getExecutionService().getExecutor(MASTERSHIP_CLAIM_EXECUTOR_NAME);
        executor.submit(new DecideNewMembersViewTask(localMemberMap, membersToAsk));
    }

SeaTunnel Zeta Engine Change

1.Add cluster node roles to differentiate node types. Add option clusterRolw in org.apache.seatunnel.engine.common.config.EngineConfig, only support udpate the option by code.

    private ClusterRole clusterRole;

    public enum ClusterRole {
        MASTER_AND_WORKER,
        MASTER,
        WORKER
    }

2.update seatunnel-cluster.sh and add -r args, Specify the role of the cluster node. If this args is not added (as in the previous method), the role of this node is MASTER_AND_WORK, which means that it can both elect a master and run jobs.


seatunnel-cluster.sh -d

seatunnel-cluster.sh -d -r master

seatunnel-cluster.sh -d -r worker

3.Update class org.apache.seatunnel.core.starter.seatunnel.command.ServerExecuteCommand

public class ServerExecuteCommand implements Command<ServerCommandArgs> {

    private final ServerCommandArgs serverCommandArgs;

    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
        this.serverCommandArgs = serverCommandArgs;
    }

    @Override
    public void execute() {
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        String clusterRole = this.serverCommandArgs.getClusterRole();
        if (StringUtils.isNotBlank(clusterRole)) {
            if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)){
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
            } else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);

                // in hazelcast lite node will not store IMap data.
                seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
            } else {
                throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
            }
        } else {
            seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
        }

        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }
} 


4.Update class the init method in class org.apache.seatunnel.engine.server.SeaTunnelServer and starting different service threads based on node roles

    public void init(NodeEngine engine, Properties hzProperties) {
        this.nodeEngine = (NodeEngineImpl) engine;
        // TODO Determine whether to execute there method on the master node according to the deploy
        // type

        classLoaderService =
                new DefaultClassLoaderService(
                        seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());

        if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal() == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
            taskExecutionService =
                    new TaskExecutionService(
                            classLoaderService, nodeEngine, nodeEngine.getProperties());
            nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
            taskExecutionService.start();
            getSlotService();
            coordinatorService =
                    new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
            monitorService = Executors.newSingleThreadScheduledExecutor();
            monitorService.scheduleAtFixedRate(
                    this::printExecutionInfo,
                    0,
                    seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
                    TimeUnit.SECONDS);

        } else if (EngineConfig.ClusterRole.WORKER.ordinal() == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
                taskExecutionService =
                        new TaskExecutionService(
                                classLoaderService, nodeEngine, nodeEngine.getProperties());
                nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
                taskExecutionService.start();
                getSlotService();
        } else {
                coordinatorService =
                        new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
                monitorService = Executors.newSingleThreadScheduledExecutor();
                monitorService.scheduleAtFixedRate(
                        this::printExecutionInfo,
                        0,
                        seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
                        TimeUnit.SECONDS);
        }

        seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());

        // a trick way to fix StatisticsDataReferenceCleaner thread class loader leak.
        // see https://issues.apache.org/jira/browse/HADOOP-19049
        FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");
    }

Compatibility, Deprecation, and Migration Plan

Compatibility

  • Compatible with old cluster startup methods, only adding new cluster startup methods.

Deprecation

Migration Plan  

Test Plan

Test coverage scenarios  

  • Need to add e2e testing and run tasks using a new cluster startup method, as follows:

org.apache.seatunnel.engine.e2e.CheckpointEnableIT
org.apache.seatunnel.engine.e2e.ClusterFaultToleranceIT
org.apache.seatunnel.engine.e2e.ClusterFaultToleranceTwoPipelineIT

Rejected Alternatives

Risk  

In the new cluster mode, if there are only WORK nodes left in the entire cluster and no new master nodes can be elected, all WORK nodes in the cluster will be shut down        

  • No labels