You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Design details and discussion for KNOX-88 

 Definition 

Knox HA is a set of routines for transparent work with Hadoop service that stands in HA mode.

 Purpose of Knox HA service

  1. Automatic failover. (Example: switch request from not responding name-node to active name-node.)
  2. Pluggable support of failover strategies.
  3. Daemon-service for regular ping of Hadoop service state (Performance optimization to keeping actual state of service).

Architecture

 

New provider will be added (descendant ProviderDeploymentContributorBase class) with a set of filters. See Pic.#1 for common architecture.

Pic. #1 – Providers architecture

 

Definition.

Alias – set of Hadoop name-nodes configured for High Availability mode.

 

Definition.

High Availability Strategy – plan of defining active name-node and switching between active and stand-by name-nodes. Strategy may contain such parameters as retryCount and timeoutInterval. See Pic.#2 for class diagram for HA mode.

 

Pic.#2 Class diagram for HA mode.

 

 

See Table #1 for class description.

Table #1. – HA mode new classes description.

#Class nameDescription
1HaUrlRewriteFunctionDescriptorDescribes function that resolves URLs in HA mode
2HaUrlRewriteFunctionProcessorImplements main logic of defining active or standby URL
3HaBaseStrategyHostMapperImplements base strategy for HA mode. Contains parameters: retryCount, timeoutInterval.

 

See Pic.#2 for  UML sequence diagram for UrlRewriteProcessor.


Pic #3 – UML sequence diagram for UrlRewriteProcessor.

Provider configuration example

 

Enables or disables HA Provider and binds strategy and provider together. Alias contains list of Hadoop services (name-nodes in our case: active and standby) grouped into one entity.

 

Topology
<topology>
  <gateway>
    ...
    <provider>
       <role>ha</role>
       <name>HAProvider</name>
       <param>
           <name>webhdfs.ha</name>
           <value>failover_strategy=BaseStrategy;retryCount=3;timeoutInterval=5000;enabled=true</value>
       </param>
       <param>
           <name>namenode.ha</name>
           <value>failover_strategy=BaseStrategy;retryCount=3;timeoutInterval=5000;enabled=true</value>
       </param>
    </provider>
    ...
  <gateway>
  ...
  <service>
     <role>WEBHDFS</role>
     <url>machine1.example.com:50070</url>
     <url>machine2.example.com:50070</url>
  </service>
  ...
  <service>
     <role>NAMENODE</role>
     <url>machine1.example.com:8020</url>
     <url>machine2.example.com:8020</url>
  </service>
...
</topology>

Parameters description:

  • failover_strategy – indicates how to define active service and contains some configuration parameters. Default value is BaseStrategy. BaseStrategy for failover has following parameters:
  • retryCount – indicates how many times knox will ping name-node before  knox decides that namenode is down.
  • timeoutInterval – interval for connection timeout. 
  • enabled – indicates whether  HAProvider  is active or not for service.

 

Example UML

Diagram Title DeploymentFactory(df)

Example Code Block

HaBaseStrategyHostMapper
public class HaBaseStrategyHostMapper implements HostMapper {

    @Override
    public String resolveInboundHostName(String inboundHost) {
 		//TODO: implement host resolution here
        return null;
    }
    @Override
    public String resolveOutboundHostName(String outboundHost) {
		//TODO: implement host resolution here
        return null;
    }
}
HaUrlRewriteFunctionDescriptor
public class HaUrlRewriteFunctionDescriptor implements UrlRewriteFunctionDescriptor<HaUrlRewriteFunctionDescriptor> {
    public static final String FUNCTION_NAME = "ha-rewrite";
    private String configLocation;
    @Override
    public String name() {
        return FUNCTION_NAME;
    }
    public HaUrlRewriteFunctionDescriptor config( String configLocation ) {
        this.configLocation = configLocation;
        return this;
    }
    public String config() {
        return configLocation;
    }
    public String getConfig() {
        return config();
    }
    public void setConfig( String configLocation ) {
        config( configLocation );
    }
}
HaUrlRewriteFunctionProcessor
public class HaUrlRewriteFunctionProcessor implements UrlRewriteFunctionProcessor<HaUrlRewriteFunctionDescriptor> {
    private HostMapperService hostMapperService;
    private HostMapper hostMapper = null;
    private String clusterName;

    @Override
    public String name() {
        return HaUrlRewriteFunctionDescriptor.FUNCTION_NAME;
    }
    @Override
    public void initialize(UrlRewriteEnvironment environment, HaUrlRewriteFunctionDescriptor descriptor) throws Exception {
        hostMapper = new HaBaseStrategyHostMapper();
        clusterName = environment.getAttribute(  GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE );
        GatewayServices services = environment.getAttribute( GatewayServices.GATEWAY_SERVICES_ATTRIBUTE );
        if( clusterName != null && services != null ) {
            hostMapperService = services.getService( GatewayServices.HOST_MAPPING_SERVICE );
            if( hostMapperService != null ) {
                hostMapperService.registerHostMapperForCluster( clusterName, hostMapper );
            }
        }
    }
    @Override
    public void destroy() throws Exception {
        if( hostMapperService != null && clusterName != null ) {
            hostMapperService.removeHostMapperForCluster( clusterName );
        }
    }
    @Override
    public List<String> resolve(UrlRewriteContext context, List<String> parameters) throws Exception {
        List<String> result = null;
        if( parameters != null ) {
            result = new ArrayList<String>( parameters.size() );
            for( String parameter : parameters ) {
                switch( context.getDirection() ) {
                    case IN:
                        parameter = hostMapper.resolveInboundHostName( parameter );
                        break;
                    case OUT:
                        parameter = hostMapper.resolveOutboundHostName( parameter );
                        break;
                }
                result.add( parameter );
            }
        }
        return result;
    }
}

 

Managing multiple requests and thread safety

 

See Pic#4 for time diagramm for thread safe HA processing.

Pic#4 - Time diagramm for thread safe HA processing

 

  1. Suppose we have two requests (Request#1 and Request#2). Request#1 starts for processing earlier than Request#2.
  2. Request#1 ends with timeout exception. This triggers failover. Class HaActiveServiceResolver has method resetStateToFailOver() which switches next URL in the list defined in <service> tag according to strategy.
  3. While switching in HaActiveServiceResolver is happening no one thread can perform a switch because resetStateToFailOver() is synchronized.
  4. Using getActiveService() method in HaActiveServiceResolver threads can get URL.

 

See Class diagramm for HaActiveServiceResolver.

Pic#5 - Class diagramm 

HaActiveServiceResolver FailoverStrategy getActiveServiceIndex() HaBaseStrategy HaActiveServiceResolver currentServiceIndex: int strategy: FailoverStrategy resetStateToFilover() getActiveService() Performs failover operation «uses»

 

See initial implementattion of HaActiveServiceResolver.

HaBaseStrategyHostMapper
public class HaActiveServiceResolver {
    private int currentServiceIndex;
    private Map<ServiceRole, List<String>> servicesMap = new HashMap<ServiceRole, List<String>>();
    private static HaActiveServiceResolver instance = new HaActiveServiceResolver();
    private FailoverStrategy strategy;
    private HaActiveServiceResolver(){
        //TODO: initialize services here with data from xml file
        servicesMap.put(ServiceRole.WEBHDFS, new ArrayList<String>());
        servicesMap.put(ServiceRole.NAMENODE, new ArrayList<String>());
        strategy = new HaBaseStrategy();
        currentServiceIndex = 0;
    }
    public  synchronized void resetStateToFailOver(ServiceRole serviceRole){
        currentServiceIndex = strategy.getActiveServiceIndex(instance.currentServiceIndex, instance.servicesMap.get(serviceRole));
    }
    public static HaActiveServiceResolver getInstance(){
        return instance;
    }
    public String getActiveService(ServiceRole serviceRole){
        return servicesMap.get(serviceRole).get(instance.currentServiceIndex);
    }
}

 

See initial implementattion HaBaseStrategy.

HaBaseStrategyHostMapper
public class HaBaseStrategy implements FailoverStrategy {
    @Override
    public int getActiveServiceIndex(int currentIndex, List<String> services) {
        int newIndex = currentIndex + 1;
        if(newIndex > services.size()){
            newIndex = 0;
        }
        return newIndex;
    }
}
  • No labels