如何将地图 List<Map<String, String>> myList 列表转换为 Java 中的 Spark Dataframe?

Posted

技术标签:

【中文标题】如何将地图 List<Map<String, String>> myList 列表转换为 Java 中的 Spark Dataframe?【英文标题】:How can I convert a list of map List<Map<String, String>> myList to Spark Dataframe in Java? 【发布时间】:2021-02-02 07:34:19 【问题描述】:

我有一个这样的地图列表,

List<Map<String, Object>> myList = new ArrayList<>();

Map<String, Object> mp1 = new HashMap<>();
mp1.put("id", 1);
mp1.put("name", "John");

Map<String, Object> mp2 = new HashMap<>();
mp2.put("id", 2);
mp2.put("name", "Carte");

我们放入地图的键值对不是固定的,我们可以有任何动态键值对(动态模式)。

我想把它转换成 spark 数据框。 (数据集行>)。

+--+--------+ |编号 |名称 | +--+--------+ | 1 |约翰 | +--+--------+ | 2 |点菜 | +--+--------+

如何做到这一点?

注意:正如我所说,键值对是动态的,我不能提前创建java bean并使用以下语法。

Dataset<Row> ds = spark.createDataFrame(myList, MyClass.class);

【问题讨论】:

【参考方案1】:

您可以从地图列表中构建行和架构,然后使用spark.createDataFrame(rows: java.util.List[Row], schema: StructType) 构建您的数据框:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.*;

...

public static Dataset<Row> buildDataframe(List<Map<String, Object>> listOfMaps, SparkSession spark) 
  // extract columns name list
  Set<String> columnSet = new HashSet<>();
  for (Map<String, Object> elem: listOfMaps) 
    columnSet.addAll(elem.keySet());
  
  List<String> columns = new ArrayList<>(columnSet);

  // build rows
  List<Row> rows = new ArrayList<>();
  for (Map<String, Object> elem : listOfMaps) 
    List<Object> row = new ArrayList<>();
    for (String key: columns) 
      row.add(elem.get(key));
    
    rows.add(new GenericRow(row.toArray()));
  

  // build schema
  List<StructField> fields = new ArrayList<>();
  for (String column: columns) 
    fields.add(new StructField(column, getDataType(column, listOfMaps), true, Metadata.empty()));
  
  StructType schema = new StructType(fields.toArray(new StructField[0]));

  // build dataframe from rows and schema
  return spark.createDataFrame(rows, schema);



public static DataType getDataType(String column, List<Map<String, Object>> data) 
  for (Map<String, Object> elem : data) 
    if (elem.get(column) != null) 
      return getDataType(elem.get(column));
    
  
  return DataTypes.NullType;


public static DataType getDataType(Object value) 
  if (value.getClass() == Integer.class) 
    return DataTypes.IntegerType;
   else if (value.getClass() == String.class) 
    return DataTypes.StringType;
    // TODO add all other spark types (Long, Timestamp, etc...)
   else 
    throw new IllegalArgumentException("unknown type for value " + value);
  

【讨论】:

以上是关于如何将地图 List<Map<String, String>> myList 列表转换为 Java 中的 Spark Dataframe?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 dart 的地图中使用和存储列表?

Flutter - 如何更新 List<Map>

List<Map<String, String>> vs List<?扩展地图<字符串,字符串>>

Java - 投射地图

如何在 Java Stream 中将 POJO 的列表转换为 Map<String,List>?

如何使用 Get Storage 保存和检索 List<Map>