1. 起因
线上有个Flink ETL任务突发Kafka消费延迟告警,之前压测时150并行度就可以满足需求,线上并行度设置为200,但任务延迟还是不断增加。
查看Kafka监控发现有流量突增,与运维同事沟通后确认是由于有一台Broker节点宕机导致,于是修改任务并行度至400加速消费数据,但消费延迟还是没有下降,吞吐只比200并行度时提高了20%且波动很大。
2. 算子性能瓶颈? GC问题?
此时开始怀疑是任务本身出现了性能瓶颈,查看Flink Dashboard发现部分节点的消费数量非常低,初步怀疑是算子内部计算逻辑的RPC调用耗时增加导致的,经过Arthas线上统计发现:正常节点的耗时在0.6ms左右,而这部分节点的耗时在1.2~20ms之间。
满心欢喜,以为已经定位到问题了,但是查看RPC服务端耗时后确认服务端一切正常。。。
于是开始排查异常节点自身问题,查看Dashboard发现这类节点的GC时长明显高于正常节点,打开GC日志打印后却发现回收内存大小并没有太大差异,通过查看Arthas Dashboard发现这些节点的负载非常高,这就是导致单个节点吞吐下降的真正原因。
3. 为啥会导致整个Job吞吐下降?
由于Yarn集群没有开启CGroup隔离,只通过vCore进行分配是没有办法保证单个节点上的任务公平使用CPU资源的,但是为啥单个节点的吞吐下降会导致整个Job的吞吐下降呢?
之前一直以为Flink上游到下游算子Task之间的数据传递是互不干扰的,某一个下游Task阻塞并不会影响其他下游Task,查看源码后发现这个理解是错误的:
写入下游Task数据时会通过ChannelSelector选择一个channel进行写入
public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
//......
@Override
public void emit(T record) throws IOException, InterruptedException {
emit(record, channelSelector.selectChannel(record));
}
}
写入的过程需要获取对应channel的buffer,虽然不同output channel之间的共享buffer是有限制的,但任何一个channe的buffer不足就会导致阻塞
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
if (builder == null) {
long start = System.currentTimeMillis();
builder = targetPartition.getBufferBuilder(targetChannel);
idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
}
return builder;
}
也就是当集群中有节点性能很差时,上游算子写入数据时会被阻塞,整体吞吐会下降,随着积压数据的消费,写入数据不再被阻塞,此时整体吞吐又会上升,然后重复上述过程。
4. 如何缓解?
开启集群的CGroup隔离可能影响到所有任务,必须要谨慎。为了快速恢复这里使用一个临时方案:
既然是因为慢节点拖累整个集群,那我们只要自定义一个分区方案,按照不同节点的数据处理能力来对数据进行分区:
public class DynamicLoadBalancing {
private static final String REDIS_ADDR = "";
public static <R> DataStream<R> rebalance(DataStream<R> dataStream, int parallelism, String slotSharingGroup) throws InterruptedException {
if (parallelism < 1 || slotSharingGroup == null) throw new IllegalArgumentException();
/**
* Part 1. repartition controller
*/
String loadGroup = UUID.randomUUID().toString();
DataStream<Tuple2<LoadEvent, R>> repartitionedStream = dataStream.map(new RichMapFunction<R, Tuple2<LoadEvent, R>>() {
final Random random = new Random();
final Map<String, Float> loadFactor = new HashMap<>();
volatile ConcurrentSkipListMap<Integer, String> loadIdMap = new ConcurrentSkipListMap<>();
RedisClusterClient redisClient;
StatefulRedisClusterConnection<String, String> connection;
RedisClusterCommands<String, String> syncCommands;
ScheduledExecutorService scheduledExecutorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
redisClient = RedisClusterClientFactory.createClient(REDIS_ADDR);
connection = redisClient.connect();
syncCommands = connection.sync();
syncCommands.expire(loadGroup, 24 * 3600);
for (int i = 0; i < parallelism; i++) loadFactor.put(i + "", 0.5f);
scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Map<String, String> feedback = syncCommands.hgetall(loadGroup);
ConcurrentSkipListMap<Integer, String> newLoadIdMap = new ConcurrentSkipListMap<>();
if (feedback == null) {
return;
}
if (feedback.size() == parallelism) {
List<Long> curLatencyInOrder = new ArrayList<>();
feedback.values().forEach(v -> curLatencyInOrder.add(Long.parseLong(v)));
Collections.sort(curLatencyInOrder);
Long midLatency = curLatencyInOrder.get(curLatencyInOrder.size() / 2);
int idIndex = 0;
for (Map.Entry<String, String> entry : feedback.entrySet()) {
Long latency = Long.valueOf(entry.getValue());
float newLoad = loadFactor.get(entry.getKey());
if (latency < midLatency) {
newLoad = Math.min(1f, newLoad + .05f);
} else if (latency > midLatency){
newLoad = Math.max(0.001f, newLoad - .05f);
}
loadFactor.put(entry.getKey(), newLoad);
idIndex += (newLoad * 10000);
newLoadIdMap.put(idIndex, entry.getKey());
}
loadIdMap = newLoadIdMap;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, 5, 5, TimeUnit.SECONDS);
}
@Override
public Tuple2<LoadEvent, R> map(R value) throws Exception {
String loadId;
if (loadIdMap.size() < parallelism) {
loadId = random.nextInt(parallelism) + "";
} else {
loadId = loadIdMap.ceilingEntry(random.nextInt(loadIdMap.floorKey(Integer.MAX_VALUE)) + 1).getValue();
}
return new Tuple2<>(new LoadEvent(loadGroup, loadId, System.currentTimeMillis()), value);
}
@Override
public void close() throws Exception {
super.close();
if (scheduledExecutorService != null) scheduledExecutorService.shutdown();
if (redisClient != null) redisClient.shutdown();
}
}).name("load-balance-controller").partitionCustom((Partitioner<LoadEvent>) (key, numPartitions) -> Integer.parseInt(key.id),
(KeySelector<Tuple2<LoadEvent, R>, LoadEvent>) value -> value.f0);
/**
* Part 2. throughout feedback
*/
return repartitionedStream.map(new RichMapFunction<Tuple2<LoadEvent, R>, R>() {
RedisClusterClient redisClient;
StatefulRedisClusterConnection<String, String> connection;
RedisClusterCommands<String, String> syncCommands;
ScheduledExecutorService scheduledExecutorService;
volatile Long avgLatency;
volatile String id;
Long lastReport;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
redisClient = RedisClusterClientFactory.createClient(REDIS_ADDR);
connection = redisClient.connect();
syncCommands = connection.sync();
scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (id != null && lastReport != null)
syncCommands.hset(loadGroup, id, String.valueOf(avgLatency));
lastReport = System.currentTimeMillis();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 3, 3, TimeUnit.SECONDS);
}
@Override
public R map(Tuple2<LoadEvent, R> value) throws Exception {
if (id == null) id = value.f0.getId();
long currentLatency = System.currentTimeMillis() - value.f0.eventTIme;
avgLatency = avgLatency == null ? currentLatency : (currentLatency + avgLatency) / 2;
return value.f1;
}
@Override
public void close() throws Exception {
super.close();
if (scheduledExecutorService != null) scheduledExecutorService.shutdown();
if (redisClient != null) redisClient.shutdown();
}
}).setParallelism(parallelism).name("load-balance-feedback").slotSharingGroup(slotSharingGroup);
}
private static class LoadEvent implements Serializable {
private String group;
private String id;
private Long eventTIme;
public LoadEvent(String group, String id, Long eventTIme) {
this.group = group;
this.id = id;
this.eventTIme = eventTIme;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getEventTIme() {
return eventTIme;
}
public void setEventTIme(Long eventTIme) {
this.eventTIme = eventTIme;
}
}
}
5. 临时方案有什么问题?
对于无状态算子使用这种方案进行负载均衡是可行的,但是在有状态的任务中改变分区也意味着必须要迁移对应的状态,这个改动还是较为复杂的,目前网上已经有一些相关的讨论可以参考。