记一次线上Flink任务吞吐下降问题的排查

1. 起因

线上有个Flink ETL任务突发Kafka消费延迟告警,之前压测时150并行度就可以满足需求,线上并行度设置为200,但任务延迟还是不断增加。

查看Kafka监控发现有流量突增,与运维同事沟通后确认是由于有一台Broker节点宕机导致,于是修改任务并行度至400加速消费数据,但消费延迟还是没有下降,吞吐只比200并行度时提高了20%且波动很大。

2. 算子性能瓶颈? GC问题?

此时开始怀疑是任务本身出现了性能瓶颈,查看Flink Dashboard发现部分节点的消费数量非常低,初步怀疑是算子内部计算逻辑的RPC调用耗时增加导致的,经过Arthas线上统计发现:正常节点的耗时在0.6ms左右,而这部分节点的耗时在1.2~20ms之间。

满心欢喜,以为已经定位到问题了,但是查看RPC服务端耗时后确认服务端一切正常。。。

于是开始排查异常节点自身问题,查看Dashboard发现这类节点的GC时长明显高于正常节点,打开GC日志打印后却发现回收内存大小并没有太大差异,通过查看Arthas Dashboard发现这些节点的负载非常高,这就是导致单个节点吞吐下降的真正原因。

3. 为啥会导致整个Job吞吐下降?

由于Yarn集群没有开启CGroup隔离,只通过vCore进行分配是没有办法保证单个节点上的任务公平使用CPU资源的,但是为啥单个节点的吞吐下降会导致整个Job的吞吐下降呢?

之前一直以为Flink上游到下游算子Task之间的数据传递是互不干扰的,某一个下游Task阻塞并不会影响其他下游Task,查看源码后发现这个理解是错误的:

写入下游Task数据时会通过ChannelSelector选择一个channel进行写入

public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {

    //......
	
	@Override
	public void emit(T record) throws IOException, InterruptedException {
		emit(record, channelSelector.selectChannel(record));
	}
}

写入的过程需要获取对应channel的buffer,虽然不同output channel之间的共享buffer是有限制的,但任何一个channe的buffer不足就会导致阻塞

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
		BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
		if (builder == null) {
			long start = System.currentTimeMillis();
			builder = targetPartition.getBufferBuilder(targetChannel);
			idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
		}
		return builder;
	}

也就是当集群中有节点性能很差时,上游算子写入数据时会被阻塞,整体吞吐会下降,随着积压数据的消费,写入数据不再被阻塞,此时整体吞吐又会上升,然后重复上述过程。

4. 如何缓解?

开启集群的CGroup隔离可能影响到所有任务,必须要谨慎。为了快速恢复这里使用一个临时方案:

既然是因为慢节点拖累整个集群,那我们只要自定义一个分区方案,按照不同节点的数据处理能力来对数据进行分区:

public class DynamicLoadBalancing {

    private static final String REDIS_ADDR = "";

    public static <R> DataStream<R> rebalance(DataStream<R> dataStream, int parallelism, String slotSharingGroup) throws InterruptedException {
        if (parallelism < 1 || slotSharingGroup == null) throw new IllegalArgumentException();
        /**
         * Part 1. repartition controller
         */
        String loadGroup = UUID.randomUUID().toString();
        DataStream<Tuple2<LoadEvent, R>> repartitionedStream = dataStream.map(new RichMapFunction<R, Tuple2<LoadEvent, R>>() {
            final Random random = new Random();
            final Map<String, Float> loadFactor = new HashMap<>();
            volatile ConcurrentSkipListMap<Integer, String> loadIdMap = new ConcurrentSkipListMap<>();

            RedisClusterClient redisClient;
            StatefulRedisClusterConnection<String, String> connection;
            RedisClusterCommands<String, String> syncCommands;
            ScheduledExecutorService scheduledExecutorService;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                redisClient = RedisClusterClientFactory.createClient(REDIS_ADDR);
                connection = redisClient.connect();
                syncCommands = connection.sync();
                syncCommands.expire(loadGroup, 24 * 3600);
                for (int i = 0; i < parallelism; i++) loadFactor.put(i + "", 0.5f);
                scheduledExecutorService = Executors.newScheduledThreadPool(1);
                scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Map<String, String> feedback = syncCommands.hgetall(loadGroup);
                            ConcurrentSkipListMap<Integer, String> newLoadIdMap = new ConcurrentSkipListMap<>();
                            if (feedback == null) {
                                return;
                            }
                            if (feedback.size() == parallelism) {
                                List<Long> curLatencyInOrder = new ArrayList<>();
                                feedback.values().forEach(v -> curLatencyInOrder.add(Long.parseLong(v)));
                                Collections.sort(curLatencyInOrder);
                                Long midLatency = curLatencyInOrder.get(curLatencyInOrder.size() / 2);
                                int idIndex = 0;
                                for (Map.Entry<String, String> entry : feedback.entrySet()) {
                                    Long latency = Long.valueOf(entry.getValue());
                                    float newLoad = loadFactor.get(entry.getKey());
                                    if (latency < midLatency) {
                                        newLoad = Math.min(1f, newLoad + .05f);
                                    } else if (latency > midLatency){
                                        newLoad = Math.max(0.001f, newLoad - .05f);
                                    }
                                    loadFactor.put(entry.getKey(), newLoad);
                                    idIndex += (newLoad * 10000);
                                    newLoadIdMap.put(idIndex, entry.getKey());
                                }
                                loadIdMap = newLoadIdMap;
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }, 5, 5, TimeUnit.SECONDS);
            }

            @Override
            public Tuple2<LoadEvent, R> map(R value) throws Exception {
                String loadId;
                if (loadIdMap.size() < parallelism) {
                    loadId = random.nextInt(parallelism) + "";
                } else {
                    loadId = loadIdMap.ceilingEntry(random.nextInt(loadIdMap.floorKey(Integer.MAX_VALUE)) + 1).getValue();
                }
                return new Tuple2<>(new LoadEvent(loadGroup, loadId, System.currentTimeMillis()), value);
            }

            @Override
            public void close() throws Exception {
                super.close();
                if (scheduledExecutorService != null) scheduledExecutorService.shutdown();
                if (redisClient != null) redisClient.shutdown();
            }
        }).name("load-balance-controller").partitionCustom((Partitioner<LoadEvent>) (key, numPartitions) -> Integer.parseInt(key.id),
                (KeySelector<Tuple2<LoadEvent, R>, LoadEvent>) value -> value.f0);

        /**
         * Part 2. throughout feedback
         */
        return repartitionedStream.map(new RichMapFunction<Tuple2<LoadEvent, R>, R>() {
            RedisClusterClient redisClient;
            StatefulRedisClusterConnection<String, String> connection;
            RedisClusterCommands<String, String> syncCommands;
            ScheduledExecutorService scheduledExecutorService;
            volatile Long avgLatency;
            volatile String id;
            Long lastReport;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                redisClient = RedisClusterClientFactory.createClient(REDIS_ADDR);
                connection = redisClient.connect();
                syncCommands = connection.sync();
                scheduledExecutorService = Executors.newScheduledThreadPool(1);
                scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            if (id != null && lastReport != null)
                                syncCommands.hset(loadGroup, id, String.valueOf(avgLatency));
                            lastReport = System.currentTimeMillis();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }, 3, 3, TimeUnit.SECONDS);
            }

            @Override
            public R map(Tuple2<LoadEvent, R> value) throws Exception {
                if (id == null) id = value.f0.getId();
                long currentLatency = System.currentTimeMillis() - value.f0.eventTIme;
                avgLatency = avgLatency == null ? currentLatency : (currentLatency + avgLatency) / 2;
                return value.f1;
            }

            @Override
            public void close() throws Exception {
                super.close();
                if (scheduledExecutorService != null) scheduledExecutorService.shutdown();
                if (redisClient != null) redisClient.shutdown();
            }
        }).setParallelism(parallelism).name("load-balance-feedback").slotSharingGroup(slotSharingGroup);
    }

    private static class LoadEvent implements Serializable {
        private String group;
        private String id;
        private Long eventTIme;

        public LoadEvent(String group, String id, Long eventTIme) {
            this.group = group;
            this.id = id;
            this.eventTIme = eventTIme;
        }

        public String getGroup() {
            return group;
        }

        public void setGroup(String group) {
            this.group = group;
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public Long getEventTIme() {
            return eventTIme;
        }

        public void setEventTIme(Long eventTIme) {
            this.eventTIme = eventTIme;
        }
    }
}

5. 临时方案有什么问题?

对于无状态算子使用这种方案进行负载均衡是可行的,但是在有状态的任务中改变分区也意味着必须要迁移对应的状态,这个改动还是较为复杂的,目前网上已经有一些相关的讨论可以参考。