Spark惰性转换执行障碍
Posted
技术标签:
【中文标题】Spark惰性转换执行障碍【英文标题】:Spark lazy transformation execution disorder 【发布时间】:2014-12-02 17:53:14 【问题描述】:我正在研究 SparkSQL。我使用 JavaPairRDD 从 HBase 获取数据,然后做了一个映射。在地图中,我将所有键保存到一个集合中。为了强制完成此映射,请遵循 collect()。 在此之后,我使用 Set 中的值进行其他操作。
这个程序可以在我的本地电脑上完美运行。但是当我把它放到集群上(2个工人)时,就会出现执行障碍。在map变换之前,先执行Set操作。
代码流程如下: 从 Hbase 获取数据:
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf,
TableInputFormat.class, ImmutableBytesWritable.class,
Result.class);
转换数据:
JavaRDD<Map<String, String>> data = hBaseRDD.map(
new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>()
public Map<String, String> call(
Tuple2<ImmutableBytesWritable, Result> re)
throws Exception
byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload"));
Map<String, String> map = new ConcurrentHashMap<String, String>();
String primaryKey = new String(re._1().get());
map.put("primaryKey", primaryKey);
if(payload != null)
map.put("payload", new String(payload));
Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo"));
if(tmpMetaMap != null)
for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet())
String tmpKey = Bytes.toString(entry.getKey());
String tmpValue = Bytes.toString(entry.getValue());
map.put(tmpKey, tmpValue);
//save result to the set
keySet.add(tmpKey);
return map;
);
强制上面的地图运行:
data.collect();
获取Set的结果:
StringBuilder sb = new StringBuilder();
for(String fieldName: keySet)
sb.append(fieldName).append(",");
当我在本地运行代码时,我可以获得所有结果。但是当我在集群上运行时,sb没有任何价值。
【问题讨论】:
【参考方案1】:此问题与操作的顺序无关,而是与集群中何处发生此类操作有关。
在 Spark 中,有两种类型的操作:转换和动作。
Transformations 通过对内容应用一些函数将 RDD 转换为另一个 RDD。这是一种纯函数式方法,没有副作用。 动作采用 RDD 并产生其他东西,例如文件或本地数据结构:这些操作将 RDD 的数据具体化为其他形式。
在这种情况下,转换函数:map
与副作用一起使用,因为 keyset
预计会在映射转换期间发生变化。
鉴于keyset
是在转换函数范围之外定义的,它将被序列化并发送给执行程序,但是远程发生的任何突变不会在驱动程序中恢复。
如果我们考虑一下,每个 executor 都会在数据的分区上应用转换,所以无论 `keyset' 以什么结尾,都只是每个分区的部分视图。
对此建模的正确方法是根据 RDD 转换和操作重新定义操作。
从上面的代码中,看起来我们想要将一些输入转换为RDD[Map[String,String]]
,我们有兴趣在驱动程序上收集所有条目中的键集,这些键不是“主键”和“有效负载”结果。
在 Spark 中,这可能类似于:
// data = RDD[Map[String, String]]
// first we get all the keys from all the maps
val keys = data.mapentry => entry.keys
// now we collect that information on the driver
val allKeys = keys.collect
// we transform the resulting array into a set - this will remove duplicates by definition
val allKeySet = allKeys.toSet
// We need still to remove "primaryKey" and "payload"
val keySet = fullKeySet.diff(Set("primaryKey","payload"))
Java 中的代码有点冗长,但结构和思想是相同的。
【讨论】:
另一个问题:为什么本地运行时可以设置keySet值?【参考方案2】:你是如何定义键集的?尝试将其定义为静态或以其他方式使用 foreach
而不是 map
这会将所有数据带到 DriverSide。希望这能回答您的问题
【讨论】:
是的,我将 keySet 定义为最终的静态 HashSet。在 hBaseRDD 之后,我还尝试了一个简单的 foreach。只是一个foreach,没有任何后续。它也不适用于 spark 服务器。以上是关于Spark惰性转换执行障碍的主要内容,如果未能解决你的问题,请参考以下文章