将Java中的地图列表转换为火花中的数据集
Posted
技术标签:
【中文标题】将Java中的地图列表转换为火花中的数据集【英文标题】:Convert a List of Map in Java to Dataset in spark 【发布时间】:2019-08-05 20:38:22 【问题描述】:我在 java 中有一个 Map 列表,基本上代表行。
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
我正在尝试从中创建一个 Spark DataFrame。
我尝试将其转换为 JavaRDD<Map<String, Object>>
使用
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
但我不知道如何从这里到Dataset<Row>
。我见过 Scala 的例子,但在 Java 中没有。
我也尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。
String jsonStr = mapper.writeValueAsString(dataList);
但似乎我必须将其写入文件然后使用
Dataset<Row> df = spark.read().json(pathToFile);
如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。
SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
.set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
ObjectMapper mapper = new ObjectMapper();
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();
【问题讨论】:
dataframe/dataset 是柱状结构。您希望地图行关联的列(或列)的值是多少?顺便说一句,您是否尝试过“createDataFrame(rows, Map.class)”?结果如何? 【参考方案1】:您根本不需要使用 RDD。您需要做的是从地图列表中提取所需的架构,将地图列表转换为行列表,然后使用spark.createDataFrame
。
在 java 中,这有点痛苦,尤其是在创建 Row
对象时,但它是这样的:
List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
.stream()
.map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
.map(row -> row.collect(Collectors.toList()))
.map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
.map(Row$.MODULE$::fromSeq)
.collect(Collectors.toList());
StructType schema = new StructType(
cols.stream()
.map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
.collect(Collectors.toList())
.toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);
【讨论】:
【参考方案2】:public class MyRow implements Serializable
private String fund;
private String broker;
private int qty;
public MyRow(String fund, String broker, int qty)
super();
this.fund = fund;
this.broker = broker;
this.qty = qty;
public String getFund()
return fund;
public void setFund(String fund)
this.fund = fund;
public String getBroker()
return broker;
public void setBroker(String broker)
this.broker = broker;
public int getQty()
return qty;
public void setQty(int qty)
this.qty = qty;
现在创建一个 ArrayList。此列表中的每个项目都将作为最终数据框中的行。
MyRow r1 = new MyRow("f1", "b1", 100);
MyRow r2 = new MyRow("f2", "b2", 200);
List<MyRow> dataList = new ArrayList<>();
dataList.add(r1);
dataList.add(r2);
现在我们必须把这个 List 转换成一个 DataSet -
Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);
ds.show()
【讨论】:
不幸的是,MyRow 的结构是动态变化的,所以我需要能够动态地做到这一点。 @gargravarr。我也有动态架构,你是怎么解决的? @gargravarr 任何人都可以为动态模式做到这一点吗?【参考方案3】:spark文档已经指出如何加载内存中的json字符串。
这是来自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"\"name\":\"Yin\",\"address\":\"city\":\"Columbus\",\"state\":\"Ohio\"");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
希望对您有所帮助。
【讨论】:
【参考方案4】:import org.apache.spark.api.java.function.Function;
private static JavaRDD<Map<String, Object>> rows;
private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));
public void test()
rows = sc.parallelize(list);
JavaRDD<Row> rowRDD = rows.map(f);
Map<String, Object> headMap = list.get(0);
TreeMap<String, Object> headerMap = new TreeMap<>(headMap);
List<StructField> fields = new ArrayList<>();
StructField field;
for (String key : headerMap.keySet())
System.out.println("key:::"+key);
Object value = list.get(0).get(key);
if (value instanceof Integer)
field = DataTypes.createStructField(key, DataTypes.IntegerType, true);
else if (value instanceof Double)
field = DataTypes.createStructField(key, DataTypes.DoubleType, true);
else if (value instanceof Date || value instanceof java.util.Date)
field = DataTypes.createStructField(key, DataTypes.DateType, true);
else
field = DataTypes.createStructField(key, DataTypes.StringType, true);
fields.add(field);
StructType struct = DataTypes.createStructType(fields);
Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);
【讨论】:
以上是关于将Java中的地图列表转换为火花中的数据集的主要内容,如果未能解决你的问题,请参考以下文章