...
Provider configuration example
Enables or disables HA Provider and binds strategy and provider together. Alias contains list of Hadoop services (name-nodes in our case: active and standby) grouped into one entity.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<topology>
<gateway>
...
<provider>
<role>ha</role>
<name>HAProvider</name>
<param>
<name>webhdfs.ha</name>
<value>failover_strategy=BaseStrategy;retryCount=3;timeoutInterval=5000;enabled=true</value>
</param>
<param>
<name>namenode.ha</name>
<value>failover_strategy=BaseStrategy;retryCount=3;timeoutInterval=5000;enabled=true</value>
</param>
</provider>
...
<gateway>
...
<service>
<role>WEBHDFS</role>
<url>machine1.example.com:50070</url>
<url>machine2.example.com:50070</url>
</service>
...
<service>
<role>NAMENODE</role>
<url>machine1.example.com:8020</url>
<url>machine2.example.com:8020</url>
</service>
...
</topology> |
Parameters description:
- failover_strategy – indicates how to define active service and contains some configuration parameters. Default value is BaseStrategy. BaseStrategy for failover has following parameters:
- retryCount – indicates how many times knox will ping name-node before knox decides that namenode is down.
- timeoutInterval – interval for connection timeout.
- enabled – indicates whether HAProvider is active or not for service.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class HaBaseStrategyHostMapper implements HostMapper {
@Override
public String resolveInboundHostName(String inboundHost) {
//TODO: implement host resolution here
return null;
}
@Override
public String resolveOutboundHostName(String outboundHost) {
//TODO: implement host resolution here
return null;
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class HaUrlRewriteFunctionDescriptor implements UrlRewriteFunctionDescriptor<HaUrlRewriteFunctionDescriptor> {
public static final String FUNCTION_NAME = "ha-rewrite";
private String configLocation;
@Override
public String name() {
return FUNCTION_NAME;
}
public HaUrlRewriteFunctionDescriptor config( String configLocation ) {
this.configLocation = configLocation;
return this;
}
public String config() {
return configLocation;
}
public String getConfig() {
return config();
}
public void setConfig( String configLocation ) {
config( configLocation );
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class HaUrlRewriteFunctionProcessor implements UrlRewriteFunctionProcessor<HaUrlRewriteFunctionDescriptor> {
private HostMapperService hostMapperService;
private HostMapper hostMapper = null;
private String clusterName;
@Override
public String name() {
return HaUrlRewriteFunctionDescriptor.FUNCTION_NAME;
}
@Override
public void initialize(UrlRewriteEnvironment environment, HaUrlRewriteFunctionDescriptor descriptor) throws Exception {
hostMapper = new HaBaseStrategyHostMapper();
clusterName = environment.getAttribute( GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE );
GatewayServices services = environment.getAttribute( GatewayServices.GATEWAY_SERVICES_ATTRIBUTE );
if( clusterName != null && services != null ) {
hostMapperService = services.getService( GatewayServices.HOST_MAPPING_SERVICE );
if( hostMapperService != null ) {
hostMapperService.registerHostMapperForCluster( clusterName, hostMapper );
}
}
}
@Override
public void destroy() throws Exception {
if( hostMapperService != null && clusterName != null ) {
hostMapperService.removeHostMapperForCluster( clusterName );
}
}
@Override
public List<String> resolve(UrlRewriteContext context, List<String> parameters) throws Exception {
List<String> result = null;
if( parameters != null ) {
result = new ArrayList<String>( parameters.size() );
for( String parameter : parameters ) {
switch( context.getDirection() ) {
case IN:
parameter = hostMapper.resolveInboundHostName( parameter );
break;
case OUT:
parameter = hostMapper.resolveOutboundHostName( parameter );
break;
}
result.add( parameter );
}
}
return result;
}
} |
Managing multiple requests and thread safety
See Pic#4 for time diagramm for thread safe HA processing.
Pic#4 - Time diagramm for thread safe HA processing
- Suppose we have two requests (Request#1 and Request#2). Request#1 starts for processing earlier than Request#2.
- Request#1 ends with timeout exception. This triggers failover. Class HaActiveServiceResolver has method resetStateToFailOver() which switches next URL in the list defined in
<service>
tag according to strategy. - While switching in HaActiveServiceResolver is happening no one thread can perform a switch because resetStateToFailOver() is synchronized.
- Using getActiveService() method in HaActiveServiceResolver threads can get URL.
See Class diagramm for HaActiveServiceResolver.
Pic#5 - Class diagramm
PlantUML | ||
---|---|---|
| ||
@startuml
interface FailoverStrategy {
+getActiveServiceIndex()
}
class HaBaseStrategy {
}
class HaActiveServiceResolver {
-currentServiceIndex: int
-strategy: FailoverStrategy
+resetStateToFilover()
+getActiveService()
}
FailoverStrategy <.. HaBaseStrategy
HaBaseStrategy <.. HaActiveServiceResolver: <<uses>>
note "Performs failover operation" as N1
HaActiveServiceResolver .. N1
@enduml |
See initial implementattion of HaActiveServiceResolver.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class HaActiveServiceResolver {
private int currentServiceIndex;
private Map<ServiceRole, List<String>> servicesMap = new HashMap<ServiceRole, List<String>>();
private static HaActiveServiceResolver instance = new HaActiveServiceResolver();
private FailoverStrategy strategy;
private HaActiveServiceResolver(){
//TODO: initialize services here with data from xml file
servicesMap.put(ServiceRole.WEBHDFS, new ArrayList<String>());
servicesMap.put(ServiceRole.NAMENODE, new ArrayList<String>());
strategy = new HaBaseStrategy();
currentServiceIndex = 0;
}
public synchronized void resetStateToFailOver(ServiceRole serviceRole){
currentServiceIndex = strategy.getActiveServiceIndex(instance.currentServiceIndex, instance.servicesMap.get(serviceRole));
}
public static HaActiveServiceResolver getInstance(){
return instance;
}
public String getActiveService(ServiceRole serviceRole){
return servicesMap.get(serviceRole).get(instance.currentServiceIndex);
}
} |
See initial implementattion HaBaseStrategy.
...
language | java |
---|---|
title | HaBaseStrategyHostMapper |
linenumbers | true |
...
Please look at the WebHDFS HA section http://knox.apache.org/books/knox-0-5-0/knox-0-5-0.html#WebHDFS