This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KNOX-88: Support HDFS HA

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Provider configuration example

 

Enables or disables HA Provider and binds strategy and provider together. Alias contains list of Hadoop services (name-nodes in our case: active and standby) grouped into one entity.

 

Code Block
languagexml
titleTopology
linenumberstrue
<topology>
  <gateway>
    ...
    <provider>
       <role>ha</role>
       <name>HAProvider</name>
       <param>
           <name>webhdfs.ha</name>
           <value>failover_strategy=BaseStrategy;retryCount=3;timeoutInterval=5000;enabled=true</value>
       </param>
       <param>
           <name>namenode.ha</name>
           <value>failover_strategy=BaseStrategy;retryCount=3;timeoutInterval=5000;enabled=true</value>
       </param>
    </provider>
    ...
  <gateway>
  ...
  <service>
     <role>WEBHDFS</role>
     <url>machine1.example.com:50070</url>
     <url>machine2.example.com:50070</url>
  </service>
  ...
  <service>
     <role>NAMENODE</role>
     <url>machine1.example.com:8020</url>
     <url>machine2.example.com:8020</url>
  </service>
...
</topology>

Parameters description:

  • failover_strategy – indicates how to define active service and contains some configuration parameters. Default value is BaseStrategy. BaseStrategy for failover has following parameters:
  • retryCount – indicates how many times knox will ping name-node before  knox decides that namenode is down.
  • timeoutInterval – interval for connection timeout. 
  • enabled – indicates whether  HAProvider  is active or not for service.

 

 

Code Block
languagejava
titleHaBaseStrategyHostMapper
linenumberstrue
public class HaBaseStrategyHostMapper implements HostMapper {

    @Override
    public String resolveInboundHostName(String inboundHost) {
 		//TODO: implement host resolution here
        return null;
    }
    @Override
    public String resolveOutboundHostName(String outboundHost) {
		//TODO: implement host resolution here
        return null;
    }
}
Code Block
languagejava
titleHaUrlRewriteFunctionDescriptor
linenumberstrue
public class HaUrlRewriteFunctionDescriptor implements UrlRewriteFunctionDescriptor<HaUrlRewriteFunctionDescriptor> {
    public static final String FUNCTION_NAME = "ha-rewrite";
    private String configLocation;
    @Override
    public String name() {
        return FUNCTION_NAME;
    }
    public HaUrlRewriteFunctionDescriptor config( String configLocation ) {
        this.configLocation = configLocation;
        return this;
    }
    public String config() {
        return configLocation;
    }
    public String getConfig() {
        return config();
    }
    public void setConfig( String configLocation ) {
        config( configLocation );
    }
}
Code Block
languagejava
titleHaUrlRewriteFunctionProcessor
linenumberstrue
public class HaUrlRewriteFunctionProcessor implements UrlRewriteFunctionProcessor<HaUrlRewriteFunctionDescriptor> {
    private HostMapperService hostMapperService;
    private HostMapper hostMapper = null;
    private String clusterName;

    @Override
    public String name() {
        return HaUrlRewriteFunctionDescriptor.FUNCTION_NAME;
    }
    @Override
    public void initialize(UrlRewriteEnvironment environment, HaUrlRewriteFunctionDescriptor descriptor) throws Exception {
        hostMapper = new HaBaseStrategyHostMapper();
        clusterName = environment.getAttribute(  GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE );
        GatewayServices services = environment.getAttribute( GatewayServices.GATEWAY_SERVICES_ATTRIBUTE );
        if( clusterName != null && services != null ) {
            hostMapperService = services.getService( GatewayServices.HOST_MAPPING_SERVICE );
            if( hostMapperService != null ) {
                hostMapperService.registerHostMapperForCluster( clusterName, hostMapper );
            }
        }
    }
    @Override
    public void destroy() throws Exception {
        if( hostMapperService != null && clusterName != null ) {
            hostMapperService.removeHostMapperForCluster( clusterName );
        }
    }
    @Override
    public List<String> resolve(UrlRewriteContext context, List<String> parameters) throws Exception {
        List<String> result = null;
        if( parameters != null ) {
            result = new ArrayList<String>( parameters.size() );
            for( String parameter : parameters ) {
                switch( context.getDirection() ) {
                    case IN:
                        parameter = hostMapper.resolveInboundHostName( parameter );
                        break;
                    case OUT:
                        parameter = hostMapper.resolveOutboundHostName( parameter );
                        break;
                }
                result.add( parameter );
            }
        }
        return result;
    }
}

 

Managing multiple requests and thread safety

 

See Pic#4 for time diagramm for thread safe HA processing.

Image Removed

Pic#4 - Time diagramm for thread safe HA processing

 

  1. Suppose we have two requests (Request#1 and Request#2). Request#1 starts for processing earlier than Request#2.
  2. Request#1 ends with timeout exception. This triggers failover. Class HaActiveServiceResolver has method resetStateToFailOver() which switches next URL in the list defined in <service> tag according to strategy.
  3. While switching in HaActiveServiceResolver is happening no one thread can perform a switch because resetStateToFailOver() is synchronized.
  4. Using getActiveService() method in HaActiveServiceResolver threads can get URL.

 

See Class diagramm for HaActiveServiceResolver.

Image Removed

Pic#5 - Class diagramm 

PlantUML Macro
titleHaActiveServiceResolver class diagram
@startuml
interface FailoverStrategy {
  +getActiveServiceIndex()
}
class HaBaseStrategy {
}
class HaActiveServiceResolver {
  -currentServiceIndex: int
  -strategy: FailoverStrategy
  +resetStateToFilover()
  +getActiveService()
}
FailoverStrategy <.. HaBaseStrategy
HaBaseStrategy <.. HaActiveServiceResolver: <<uses>>
note "Performs failover operation" as N1
HaActiveServiceResolver .. N1
@enduml

 

See initial implementattion of HaActiveServiceResolver.

Code Block
languagejava
titleHaBaseStrategyHostMapper
linenumberstrue
public class HaActiveServiceResolver {
    private int currentServiceIndex;
    private Map<ServiceRole, List<String>> servicesMap = new HashMap<ServiceRole, List<String>>();
    private static HaActiveServiceResolver instance = new HaActiveServiceResolver();
    private FailoverStrategy strategy;
    private HaActiveServiceResolver(){
        //TODO: initialize services here with data from xml file
        servicesMap.put(ServiceRole.WEBHDFS, new ArrayList<String>());
        servicesMap.put(ServiceRole.NAMENODE, new ArrayList<String>());
        strategy = new HaBaseStrategy();
        currentServiceIndex = 0;
    }
    public  synchronized void resetStateToFailOver(ServiceRole serviceRole){
        currentServiceIndex = strategy.getActiveServiceIndex(instance.currentServiceIndex, instance.servicesMap.get(serviceRole));
    }
    public static HaActiveServiceResolver getInstance(){
        return instance;
    }
    public String getActiveService(ServiceRole serviceRole){
        return servicesMap.get(serviceRole).get(instance.currentServiceIndex);
    }
}

 

See initial implementattion HaBaseStrategy.

Code Block
languagejava
titleHaBaseStrategyHostMapper
linenumberstrue
public class HaBaseStrategy implements FailoverStrategy {
    @Override
    public int getActiveServiceIndex(int currentIndex, List<String> services) {
        int newIndex = currentIndex + 1;
        if(newIndex > services.size()){
            newIndex = 0;
        }
        return newIndex;
    }
}

Please look at the WebHDFS HA section http://knox.apache.org/books/knox-0-5-0/knox-0-5-0.html#WebHDFS