HDFS学习笔记之:(3)RPC重试

为了规范内部不同组件代码的重试机制,hadoop对内部重试进行了统一封装,需要进行失败重试时调用RetryProxy.create()来创建一个代理对象:

public static <T> Object create(Class<T> iface, T implementation,
                              RetryPolicy retryPolicy)
                              
public static <T> Object create(Class<T> iface,
      FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy)
      
public static <T> Object create(Class<T> iface, T implementation,
                              Map<String,RetryPolicy> methodNameToPolicyMap)

代理对象利用JAVA自身的动态代理进行创建(Proxy.newProxyInstance()),从上面的方法参数可以看到,还需要额外指定重试的策略以及故障切换时的stub对象provider,当被代理方法抛出异常时RetryInvocationHandler会捕获异常,并按照RetryPolicy进行处理

@Override
  public Object invoke(Object proxy, Method method, Object[] args)
    throws Throwable {
    RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
    if (policy == null) {
      policy = defaultPolicy;
    }
    ......
    while (true) {
      ......
      try {
        Object ret = invokeMethod(method, args);
        hasMadeASuccessfulCall = true;
        return ret;
      } catch (Exception e) {
        ......
        RetryAction action = policy.shouldRetry(e, retries++,
            invocationFailoverCount, isIdempotentOrAtMostOnce);
        if (action.action == RetryAction.RetryDecision.FAIL) {
          if (action.reason != null) {
            LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
                + "." + method.getName() + " over " + currentProxy.proxyInfo
                + ". Not retrying because " + action.reason, e);
          }
          throw e;
        } else { // retry or failover
          ......
        }
      }
    }
  }

对于RPC调用的重试也是使用上述机制来实现的,以NameNode为例:如果hdfs配置文件开启了HA机制,客户端创建ClientProtocol代理对象时会在外层套上一层重试代理来实现RPC异常时的重试:

public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
      URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
        createFailoverProxyProvider(conf, nameNodeUri, xface, true,
          fallbackToSimpleAuth);
  
    if (failoverProxyProvider == null) {
      // Non-HA case
      return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
          UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
    } else {
      // HA case
      Conf config = new Conf(conf);
      T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
          RetryPolicies.failoverOnNetworkException(
              RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
              config.maxRetryAttempts, config.failoverSleepBaseMillis,
              config.failoverSleepMaxMillis));

      Text dtService;
      if (failoverProxyProvider.useLogicalURI()) {
        dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
            HdfsConstants.HDFS_URI_SCHEME);
      } else {
        dtService = SecurityUtil.buildTokenService(
            NameNode.getAddress(nameNodeUri));
      }
      return new ProxyAndInfo<T>(proxy, dtService,
          NameNode.getAddress(nameNodeUri));
    }
  }