合并在Apache spark中具有不同列名的两个数据集

Posted

技术标签:

【中文标题】合并在Apache spark中具有不同列名的两个数据集【英文标题】:merge two dataset which are having different column names in Apache spark 【发布时间】:2017-05-03 07:27:19 【问题描述】:

我们需要合并两个具有不同列名的数据集,数据集之间没有共同的列。

我们尝试了几种方法,两种方法都没有产生结果。请告诉我们如何使用 Apache spark Java 组合两个数据集

输入数据集 1

"405-048011-62815", "CRC Industries",

"630-0746","Dixon value",

"4444-444","3M INdustries",

"555-55","Dixon coupling valve"

输入数据集 2

"222-2222-5555", "Tata",

"7777-88886","WestSide",

"22222-22224","Reliance",

"33333-3333","V industries"

预计是

    ----------label1----|------sentence1------|------label2---|------sentence2-----------
    | 405-048011-62815  | CRC Industries      | 222-2222-5555 |                      Tata|
    |        630-0746   |   Dixon value       |   7777-88886  |                  WestSide|
    -------------------------------------------------------------------------------------

`

    List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));

    StructType schema = new StructType(new StructField[] new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) );

    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

    List<String> listStrings = new ArrayList<String>();
    listStrings.add("405-048011-62815");
    listStrings.add("630-0746");

    Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
    matchFound1.show();
    listStrings.clear();
    listStrings.add("222-2222-5555");
    listStrings.add("7777-88886");

    List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));

    StructType schema2 = new StructType(new StructField[] new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
    new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) );

    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

    Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
    matchFound2.show();

    //Approach 1
    Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2"));
    System.out.println("After concat");
    matchFound3.show();

    //Approach 2
    Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2")));
    System.out.println("After concat 2");
    matchFound4.show();`

每种方法的错误如下

方法 1 错误

----------
org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];;
!Project [label1#0, sentence1#1, label2#10, sentence2#11]
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]


----------
Error for each of the approaches are as follows
Approach 2 error
org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;;
!Filter concat(label1#0, sentence1#1, label2#10, sentence2#11)
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]

【问题讨论】:

【参考方案1】:

希望这对你有用

DF

val pre: Array[String] = Array("CRC Industries", "Dixon value" ,"3M INdustries" ,"Dixon coupling valve")
        val rea: Array[String] = Array("405048011-62815", "630-0746", "4444-444", "555-55")
        val df1 = sc.parallelize( rea zip pre).toDF("label1","sentence1")

        val preasons2: Array[String] = Array("Tata", "WestSide","Reliance", "V industries")
         val reasonsI2: Array[String] = Array( "222-2222-5555", "7777-88886", "22222-22224", "33333-3333")
        val df2 = sc.parallelize( reasonsI2 zip preasons2 ).toDF("label2","sentence2")

字符串索引器

导入 org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("label1")
  .setOutputCol("label1Index")

val indexed = indexer.fit(df1).transform(df1)
indexed.show()

val indexer1 = new StringIndexer()
  .setInputCol("label2")
  .setOutputCol("label2Index")

val indexed1 = indexer1.fit(df2).transform(df2)
indexed1.show()

加入

    val rnd_reslt12 = indexed.join(indexed1 , indexed.col("label1Index")===indexed1.col("label2Index")).drop(indexed.col("label1Index")).drop(indexed1.col("label2Index"))
rnd_reslt12.show()

+---------------+--------------------+-------------+------------+
|         label1|           sentence1|       label2|   sentence2|
+---------------+--------------------+-------------+------------+
|       630-0746|         Dixon value|222-2222-5555|        Tata|
|       4444-444|       3M INdustries|  22222-22224|    Reliance|
|         555-55|Dixon coupling valve|   33333-3333|V industries|
|405048011-62815|      CRC Industries|   7777-88886|    WestSide|
+---------------+--------------------+-------------+------------+

【讨论】:

我已经提到没有公共列,所以我们不能使用 df_reason1.col("id")===df_reason2.col("code") 您可以创建索引将其应用于每个数据帧,然后执行连接 现在检查这就是你想要的【参考方案2】:

使用我用 java 完成的字符串索引器,这将起作用。

public class StringIndexer11  

    public static void main(String[] args) 
        Dataset<Row> csvDataSet=null;
        try
            System.setProperty("hadoop.home.dir", "D:\\AI matching\\winutil");
            JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
            SQLContext sqlContext = new SQLContext(sc);
            SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
            List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));

            StructType schema = new StructType(new StructField[] new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) );

            Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

            List<String> listStrings = new ArrayList<String>();
            listStrings.add("405-048011-62815");
            listStrings.add("630-0746");
             Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
            matchFound1.show();
            listStrings.clear();
            listStrings.add("222-2222-5555");
            listStrings.add("7777-88886");

            StringIndexer indexer = new StringIndexer()
              .setInputCol("label1")
              .setOutputCol("label1Index");
            Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1);
            //Dataset1.show();


            List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));

            StructType schema2 = new StructType(new StructField[] new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) );

            Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

            Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
            matchFound2.show();

            StringIndexer indexer1 = new StringIndexer()
              .setInputCol("label2")
              .setOutputCol("label2Index");
            Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2);
            //Dataset2.show();
            Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index").equalTo(Dataset2.col("label2Index"))).drop(Dataset1.col("label1Index")).drop(Dataset2.col("label2Index"));
                    Finalresult.show();


        catch(Exception e)
        
            e.printStackTrace();
        

    

【讨论】:

以上是关于合并在Apache spark中具有不同列名的两个数据集的主要内容,如果未能解决你的问题,请参考以下文章

合并两个具有相同列名但在熊猫中列数不同的数据框

在 R 中匹配和合并具有不同列名的数据集

如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN

两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败

合并具有不同列名但定义相同的多个CSV

Pandas 合并具有不同列的两个数据框