为了规范内部不同组件代码的重试机制,hadoop对内部重试进行了统一封装,需要进行失败重试时调用RetryProxy.create()
来创建一个代理对象:
public static <T> Object create(Class<T> iface, T implementation,
RetryPolicy retryPolicy)
public static <T> Object create(Class<T> iface,
FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy)
public static <T> Object create(Class<T> iface, T implementation,
Map<String,RetryPolicy> methodNameToPolicyMap)
代理对象利用JAVA自身的动态代理进行创建(Proxy.newProxyInstance()),从上面的方法参数可以看到,还需要额外指定重试的策略以及故障切换时的stub对象provider,当被代理方法抛出异常时RetryInvocationHandler会捕获异常,并按照RetryPolicy进行处理
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
if (policy == null) {
policy = defaultPolicy;
}
......
while (true) {
......
try {
Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
return ret;
} catch (Exception e) {
......
RetryAction action = policy.shouldRetry(e, retries++,
invocationFailoverCount, isIdempotentOrAtMostOnce);
if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo
+ ". Not retrying because " + action.reason, e);
}
throw e;
} else { // retry or failover
......
}
}
}
}
对于RPC调用的重试也是使用上述机制来实现的,以NameNode为例:如果hdfs配置文件开启了HA机制,客户端创建ClientProtocol代理对象时会在外层套上一层重试代理来实现RPC异常时的重试:
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
// Non-HA case
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else {
// HA case
Conf config = new Conf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
config.maxRetryAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
NameNode.getAddress(nameNodeUri));
}
return new ProxyAndInfo<T>(proxy, dtService,
NameNode.getAddress(nameNodeUri));
}
}