Spark广播机制解析

1. 使用场景

在spark开发过程中,广播变量会被分发到Executor,当Executor执行多个Task时会复用广播变量。所以广播变量最常见的使用场景有两个:

  1. 共享重资源:重资源的初始化十分消耗性能(例如:各类存储系统的客户端和连接池等),如果不使用广播可能造成频繁的初始化而影响性能,即使基于分区创建连接,在分区数较多的情况下也会占用大量资源。
  2. 共享只读数据:多次使用的只读数据可以使用广播发送到每个节点,避免重复传输,起到性能优化的作用。

2. 实现方式

广播的实现分为几个步骤:

  1. 序列化:由Driver将广播变量序列化,生成chunks保存在Driver的BlockManager中。
  2. 传输读取:Executor在使用广播变量时首先尝试从本地BlockManager中获取数据,如果获取失败则从Driver或其他Executor的BlockManager获取,最终在首次使用时将数据反序列化成对象。

传输实现有两种方式:HttpBroadcast所有Executor都从Driver获取广播变量、TorrentBroadcastExecutor之间也可以传输广播变量,避免Driver性能瓶颈。

3.不可序列化对象

由2中描述可知,广播的变量必须是可序列化的否则会抛出NotSerializableException,但是如果想广播不可序列化的对象如何实现呢?这里以Redis客户端为示例。

为了避免这个问题,只需要使用使用一层wrapper类,将不可序列化的类作为懒加载变量即可,即在Driver序列化的时候对象是可序列化的就能满足要求:

import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.{HostAndPort, JedisCluster}

//使用RedisSink来封装Redis连接池,避免直接广播JedisCluster
class RedisSink extends Serializable {
  //使用lazy修饰的变量,序列化时不会初始化此变量
  lazy val cluster: JedisCluster = RedisSink.createObj()
}

object RedisSink {
  def createObj() : JedisCluster = {
    new JedisCluster(new HostAndPort("127.0.0.1", 2181))
  }
}

使用广播JedisCluster:

def main(args: Array[String]): Unit = {
    val config = new SparkConf().setAppName("TestRedisSink")
    val sc = new SparkContext(config)
    val rdd = sc.parallelize(Array(1, 2, 3))
    val cluster = sc.broadcast(new TestSink)
    rdd.map(i => {
      //使用广播变量
      cluster.value.cluster.get(i.toString)
    }).count()
  }

这样就可以正常在node中共享一个连接池了。
另外如果客户端有缓冲区还要在程序退出时关闭连接:

val cluster = new JedisCluster(new HostAndPort("127.0.0.1", 2181))
    sys.addShutdownHook {
      cluster.close()
    }
    cluster
}

4. JAVA实现

上面提到用Scala中的懒加载变量实现不可序列化对象的广播,那如果用JAVA开发Spark程序如何实现呢?

其实也可以参考Scala的实现方式,如果一个变量被lazy修饰后:

class TestLazy {
  lazy val a = 0;
}

与之对应的Java代码如下:

public class TestLazy {
  private int a;
  
  private volatile boolean bitmap$0;
  
  private int a$lzycompute() {
    synchronized (this) {
      if (!this.bitmap$0) {
        this.a = 0;
        this.bitmap$0 = true;
      } 
      return this.a;
    } 
  }
  
  public int a() {
    return this.bitmap$0 ? this.a : a$lzycompute();
  }
}

可以看到,对变量a的读取都变成了对a()的调用,只有首次读取a的值时才会触发a的初始化,而且类序列化的过程是会跳过值为null的字段和方法,也就使得任何类型的对象都可以使用这种方式来实现可序列化。