将Java中的地图列表转换为火花中的数据集

Posted

技术标签:

【中文标题】将Java中的地图列表转换为火花中的数据集【英文标题】:Convert a List of Map in Java to Dataset in spark 【发布时间】:2019-08-05 20:38:22 【问题描述】:

我在 java 中有一个 Map 列表,基本上代表行。

List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);

Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);

dataList.add(row1);
dataList.add(row2);

我正在尝试从中创建一个 Spark DataFrame。

我尝试将其转换为 JavaRDD&lt;Map&lt;String, Object&gt;&gt; 使用

JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);

但我不知道如何从这里到Dataset&lt;Row&gt;。我见过 Scala 的例子,但在 Java 中没有。

我也尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。

String jsonStr = mapper.writeValueAsString(dataList);

但似乎我必须将其写入文件然后使用

Dataset<Row> df = spark.read().json(pathToFile);

如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。

SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
            .set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
    SparkSession sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate();

List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);

Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);

dataList.add(row1);
dataList.add(row2);

ObjectMapper mapper = new ObjectMapper();
    
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();

【问题讨论】:

dataframe/dataset 是柱状结构。您希望地图行关联的列(或列)的值是多少?顺便说一句,您是否尝试过“createDataFrame(rows, Map.class)”?结果如何? 【参考方案1】:

您根本不需要使用 RDD。您需要做的是从地图列表中提取所需的架构,将地图列表转换为行列表,然后使用spark.createDataFrame

在 java 中,这有点痛苦,尤其是在创建 Row 对象时,但它是这样的:

List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
    .stream()
    .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
    .map(row -> row.collect(Collectors.toList()))
    .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
    .map(Row$.MODULE$::fromSeq)
    .collect(Collectors.toList());

StructType schema = new StructType(
    cols.stream()
        .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
        .collect(Collectors.toList())
        .toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);

【讨论】:

【参考方案2】:
public class MyRow implements Serializable 

  private String fund;
  private String broker;
  private int qty;

  public MyRow(String fund, String broker, int qty) 
    super();
    this.fund = fund;
    this.broker = broker;
    this.qty = qty;
  

  public String getFund() 
    return fund;
  

  public void setFund(String fund) 
    this.fund = fund;
  


  public String getBroker() 
    return broker;
  

  public void setBroker(String broker) 
    this.broker = broker;
  

  public int getQty() 
    return qty;
  

  public void setQty(int qty) 
    this.qty = qty;
  


现在创建一个 ArrayList。此列表中的每个项目都将作为最终数据框中的行。

MyRow r1 = new MyRow("f1", "b1", 100);
MyRow r2 = new MyRow("f2", "b2", 200);
List<MyRow> dataList = new ArrayList<>();
dataList.add(r1);
dataList.add(r2);

现在我们必须把这个 List 转换成一个 DataSet -

Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);
ds.show()

【讨论】:

不幸的是,MyRow 的结构是动态变化的,所以我需要能够动态地做到这一点。 @gargravarr。我也有动态架构,你是怎么解决的? @gargravarr 任何人都可以为动态模式做到这一点吗?【参考方案3】:

spark文档已经指出如何加载内存中的json字符串。

这是来自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
        "\"name\":\"Yin\",\"address\":\"city\":\"Columbus\",\"state\":\"Ohio\"");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

希望对您有所帮助。

【讨论】:

【参考方案4】:
import org.apache.spark.api.java.function.Function;
private static JavaRDD<Map<String, Object>> rows;
private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));
public void test()
    rows = sc.parallelize(list);
    JavaRDD<Row> rowRDD = rows.map(f);
    Map<String, Object> headMap = list.get(0);
    TreeMap<String, Object> headerMap = new TreeMap<>(headMap);
    List<StructField> fields = new ArrayList<>();
    StructField field;
    for (String key : headerMap.keySet()) 
        System.out.println("key:::"+key);
        Object value = list.get(0).get(key);
        if (value instanceof Integer) 
            field = DataTypes.createStructField(key, DataTypes.IntegerType, true);
        
        else if (value instanceof Double) 
            field = DataTypes.createStructField(key, DataTypes.DoubleType, true);
        
        else if (value instanceof Date || value instanceof java.util.Date) 
            field = DataTypes.createStructField(key, DataTypes.DateType, true);
        
        else 
            field = DataTypes.createStructField(key, DataTypes.StringType, true);
        
            fields.add(field);
    
    StructType struct = DataTypes.createStructType(fields);
    Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);

【讨论】:

以上是关于将Java中的地图列表转换为火花中的数据集的主要内容,如果未能解决你的问题,请参考以下文章

如何在不转换为火花数据集的情况下遍历数据框?

将火花数据帧动态转换为元组数据集(字符串,_<:产品)

火花数据集的转换

如何使用 mapPartitions 函数将 Rdd 转换为数据集

在火花中加入大小不等的数据集

如何在 Spark Java 中的数据集上应用地图功能