在 Apache Spark 中搜索和替换

Posted

技术标签:

【中文标题】在 Apache Spark 中搜索和替换【英文标题】:Search and replace in Apache Spark 【发布时间】:2017-04-17 18:05:23 【问题描述】:

我们创建了两个数据集sentenceDataFrame,sentenceDataFrame2 应该发生搜索替换。

sentenceDataFrame2 存储搜索和替换术语。

我们还执行了所有 11 种类型的连接 'inner'、'outer'、'full'、'fullouter'、'leftouter'、'left'、'rightouter'、'right'、'leftsemi'、'leftanti' , 'cross' 他们都没有给我们结果。

您能否告诉我们我们要去哪里错了,请指出我们正确的方向。

        List<Row> data = Arrays.asList(
            RowFactory.create(0, "Allen jeevi pramod Allen"),
            RowFactory.create(1,"sandesh Armstrong jeevi"),
            RowFactory.create(2,"harsha Nischay DeWALT"));

        StructType schema = new StructType(new StructField[] 
        new StructField("label", DataTypes.IntegerType, false,
          Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false,
          Metadata.empty()) );
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);


        List<Row> data2 = Arrays.asList(
          RowFactory.create("Allen", "Apex Tool Group"),
          RowFactory.create("Armstrong","Apex Tool Group"),
          RowFactory.create("DeWALT","StanleyBlack"));

        StructType schema2 = new StructType(new StructField[] 
        new StructField("label2", DataTypes.StringType, false,
          Metadata.empty()),
        new StructField("sentence2", DataTypes.StringType, false,
          Metadata.empty()) );
        Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

        Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross");
        System.out.println("Left anti join count :"+remainingElements.count());

输入

艾伦·吉维·普拉莫德·艾伦 桑德什·阿姆斯特朗·吉维 哈沙·尼夏·德瓦尔特

预期输出

Apex 工具组 jeevi pramod Apex 工具组 山德什 Apex 工具组 jeevi 哈沙·尼夏·斯坦利布莱克

【问题讨论】:

【参考方案1】:

对于不涉及此类简单等式的连接条件,您将需要使用 Spark 用户定义函数 (UDF)。

这是一个 JUnit 代码 sn-p,它不会直接编译,但会显示相关的导入和逻辑。不过,Java API 相当冗长。我将把在 Scala 中执行此操作的问题留给读者作为练习。它会更简洁。

callUDF()col() 方法需要静态导入。

import static org.apache.spark.sql.functions.*;

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

@Test
public void testSomething() 
    List<Row> data = Arrays.asList(
        RowFactory.create(0, "Allen jeevi pramod Allen"),
        RowFactory.create(1, "sandesh Armstrong jeevi"),
        RowFactory.create(2, "harsha Nischay DeWALT")
    );

    StructType schema = new StructType(new StructField[] 
        new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
    );
    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

    List<Row> data2 = Arrays.asList(
        RowFactory.create("Allen", "Apex Tool Group"),
        RowFactory.create("Armstrong","Apex Tool Group"),
        RowFactory.create("DeWALT","StanleyBlack")
    );

    StructType schema2 = new StructType(new StructField[] 
        new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
        new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
    );
    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

    UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() 
        private static final long serialVersionUID = -5239951370238629896L;

        @Override
        public Boolean call(String t1, String t2) throws Exception 
            return t1.contains(t2);
        
    ;
    spark.udf().register("contains", contains, DataTypes.BooleanType);

    UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() 
        private static final long serialVersionUID = -2882956931420910207L;

        @Override
        public String call(String t1, String t2, String t3) throws Exception 
            return t1.replaceAll(t2, t3);
        
    ;
    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);

    Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
                                           .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
                                           .select(col("sentence_replaced"));

    joined.show(false);

输出:

+--------------------------------------------+
|sentence_replaced                           |
+--------------------------------------------+
|Apex Tool Group jeevi pramod Apex Tool Group|
|sandesh Apex Tool Group jeevi               |
|harsha Nischay StanleyBlack                 |
+--------------------------------------------+

【讨论】:

【参考方案2】:

我们可以使用 replaceAll 和 UDF 函数来实现预期的输出。

public class Test 

    public static void main(String[] args) 
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

        List<Row> data = Arrays.asList(
        RowFactory.create(0, "Allen jeevi pramod Allen"),
        RowFactory.create(1, "sandesh Armstrong jeevi"),
        RowFactory.create(2, "harsha Nischay DeWALT")
    );

        StructType schema = new StructType(new StructField[] 
        new StructField("label", DataTypes.IntegerType, false,
                Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false,
                Metadata.empty()) );
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
        UDF1 mode = new UDF1<String, String>() 
            public String call(final String types) throws Exception 
                return types.replaceAll("Allen", "Apex Tool Group")
                .replaceAll("Armstrong","Apex Tool Group")
                .replaceAll(""DeWALT","StanleyBlack"")
            
        ;

        sqlContext.udf().register("mode", mode, DataTypes.StringType);

        sentenceDataFrame.createOrReplaceTempView("people");
        Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
        newDF.show(false);


输出

  +--------------------------------------------+------+
  |sentence                                    |label |
  +--------------------------------------------+------+
  |Apex Tool Group jeevi pramod Apex Tool Group|  0   |
  |sandesh Apex Tool Group jeevi               |  1   |
  |harsha Nischay StanleyBlack                 |  2   |
  +--------------------------------------------+------+

【讨论】:

【参考方案3】:

仍然面临类似的问题

输入

艾伦·阿姆斯特朗 jeevi pramod Allen 桑德什·阿姆斯特朗·吉维 沙尼沙伊德瓦特

输出

Apex 工具组 Armstrong jeevi pramod Apex 工具组 Allen Apex 工具组 jeevi pramod Allen 山德什 Apex 工具组 jeevi 沙尼沙伊斯坦利布莱克

预期输出

Apex 工具组 Apex 工具组 jeevi pramod Apex 工具组 山德什 Apex 工具组 jeevi 沙尼沙伊斯坦利布莱克

当连续有多个替换时得到这个输出。

是否有任何其他必须遵循的方法才能获得正确的输出。?还是这是 UDF 的限制?

【讨论】:

以上是关于在 Apache Spark 中搜索和替换的主要内容,如果未能解决你的问题,请参考以下文章

Apache POI Excel - 搜索和替换文本

使用Spark进行搜狗日志分析实例——统计每个小时的搜索量

使用Spark进行搜狗日志分析实例——列出搜索不同关键词超过10个的用户及其搜索的关键词

解决Apache官网无法访问的问题

2018年前100名Apache Spark面试问题和解答(上)

2018年前100名Apache Spark面试问题和解答(上)