1. 使用场景
在spark开发过程中,广播变量会被分发到Executor,当Executor执行多个Task时会复用广播变量。所以广播变量最常见的使用场景有两个:
共享重资源
:重资源的初始化十分消耗性能(例如:各类存储系统的客户端和连接池等),如果不使用广播可能造成频繁的初始化而影响性能,即使基于分区创建连接,在分区数较多的情况下也会占用大量资源。共享只读数据
:多次使用的只读数据可以使用广播发送到每个节点,避免重复传输,起到性能优化的作用。
2. 实现方式
广播的实现分为几个步骤:
序列化
:由Driver将广播变量序列化,生成chunks保存在Driver的BlockManager中。传输读取
:Executor在使用广播变量时首先尝试从本地BlockManager中获取数据,如果获取失败则从Driver或其他Executor的BlockManager获取,最终在首次使用时将数据反序列化成对象。
传输实现有两种方式:HttpBroadcast
所有Executor都从Driver获取广播变量、TorrentBroadcast
Executor之间也可以传输广播变量,避免Driver性能瓶颈。
3.不可序列化对象
由2中描述可知,广播的变量必须是可序列化的否则会抛出NotSerializableException
,但是如果想广播不可序列化的对象如何实现呢?这里以Redis客户端为示例。
为了避免这个问题,只需要使用使用一层wrapper类,将不可序列化的类作为懒加载变量即可,即在Driver序列化的时候对象是可序列化的就能满足要求:
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.{HostAndPort, JedisCluster}
//使用RedisSink来封装Redis连接池,避免直接广播JedisCluster
class RedisSink extends Serializable {
//使用lazy修饰的变量,序列化时不会初始化此变量
lazy val cluster: JedisCluster = RedisSink.createObj()
}
object RedisSink {
def createObj() : JedisCluster = {
new JedisCluster(new HostAndPort("127.0.0.1", 2181))
}
}
使用广播JedisCluster:
def main(args: Array[String]): Unit = {
val config = new SparkConf().setAppName("TestRedisSink")
val sc = new SparkContext(config)
val rdd = sc.parallelize(Array(1, 2, 3))
val cluster = sc.broadcast(new TestSink)
rdd.map(i => {
//使用广播变量
cluster.value.cluster.get(i.toString)
}).count()
}
这样就可以正常在node中共享一个连接池了。
另外如果客户端有缓冲区还要在程序退出时关闭连接:
val cluster = new JedisCluster(new HostAndPort("127.0.0.1", 2181))
sys.addShutdownHook {
cluster.close()
}
cluster
}
4. JAVA实现
上面提到用Scala中的懒加载变量实现不可序列化对象的广播,那如果用JAVA开发Spark程序如何实现呢?
其实也可以参考Scala的实现方式,如果一个变量被lazy修饰后:
class TestLazy {
lazy val a = 0;
}
与之对应的Java代码如下:
public class TestLazy {
private int a;
private volatile boolean bitmap$0;
private int a$lzycompute() {
synchronized (this) {
if (!this.bitmap$0) {
this.a = 0;
this.bitmap$0 = true;
}
return this.a;
}
}
public int a() {
return this.bitmap$0 ? this.a : a$lzycompute();
}
}
可以看到,对变量a的读取都变成了对a()的调用,只有首次读取a的值时才会触发a的初始化,而且类序列化的过程是会跳过值为null的字段和方法,也就使得任何类型的对象都可以使用这种方式来实现可序列化。