text spark.groupByKey,combineByKey

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了text spark.groupByKey,combineByKey相关的知识,希望对你有一定的参考价值。

pairRdd中最好不要用groupByKey,因为groupBy类函数会使用shuffl带来性能问题,所以pairRdd一般使用combineByKey:
示例:
使用前rdd格式: JavaPairRDD<String, HotsCompare>
	pairRdd2 = pairRdd.combineByKey(e -> {
			ArrayList<HotsCompare> list = new ArrayList<HotsCompare>();
			list.add(e);
			return list;
		}, (list, e) -> {
			list.add(e);
			return list;
		}, (lista, listb) -> {
			lista.addAll(listb);
			return lista;
		});
使用后pairRdd2格式:JavaPairRDD<String, List<HotsCompare>>

可使用dataset的groupByKey()+mapGroups()代替pairRdd的combineByKey():
//df原schema:
StructType flatSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false),
				DataTypes.createStructField("pathId", StringType, true), DataTypes.createStructField("rank", IntegerType, true), });
//df转换后的schema:
StructType returnSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false),
				DataTypes.createStructField("bsr_ext", DataTypes.createMapType(StringType, IntegerType, true), true) });
//同一个asin数据组合为map
df = df.groupByKey(row -> row.<String> getAs("asin"), Encoders.STRING()).mapGroups((key, values) -> {
			String asin = key;
			Iterator<Row> t = values;
			Map<String, Integer> map = Maps.newHashMap();
			while (t.hasNext()) {
				Row row = t.next();
				String pathId = row.getAs("pathId");
				Integer rank = row.getAs("rank");
				map.put(pathId, rank);
			}
			return new GenericRowWithSchema(new Object[] { asin, asScalaMap(map) }, returnSchema);
		}, RowEncoder.apply(returnSchema));
		

以上是关于text spark.groupByKey,combineByKey的主要内容,如果未能解决你的问题,请参考以下文章

从byte []创建com.lowagie.text.Image的实例

text AHK COM接口

text FindThePrevious.com

text FindThePrevious.com

text FindThePrevious.com

text #https://chartkick.com/react