使用 Apache spark java 搜索替换

Posted

技术标签:

【中文标题】使用 Apache spark java 搜索替换【英文标题】:search replace using Apache spark java 【发布时间】:2017-05-08 10:35:10 【问题描述】:

问题陈述:

我们需要在一个大型数据集(50000 行)上将一行中单词的同义词替换为等效的单词(来自大量同义词列表 ~40000+ 键值对)。

示例: 输入

Allen jeevi pramod Allen Armstrong
sandesh Armstrong jeevi
harsha Nischay DeWALT

同义词列表(键值对) //我们有40000个条目

Key         |   Value 
------------------------------------
Allen       |   Apex Tool Group
Armstrong   |   Columbus McKinnon
DeWALT      |   StanleyBlack

输入必须使用上述同义词列表,输出应如下所示。

Expected Output

Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack

我们尝试了 3 种方法,它们都有自己的局限性

方法 1

使用 UDF

    public void test () 
            List<Row> data = Arrays.asList(
            RowFactory.create(0, "Allen jeevi pramod Allen Armstrong"),
            RowFactory.create(1, "sandesh Armstrong jeevi"),
            RowFactory.create(2, "harsha Nischay DeWALT")
       );

        StructType schema = new StructType(new StructField[] 
            new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
       );
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

        List<Row> data2 = Arrays.asList(
            RowFactory.create("Allen", "Apex Tool Group"),
            RowFactory.create("Armstrong","Columbus McKinnon"),
            RowFactory.create("DeWALT","StanleyBlack")
        );

        StructType schema2 = new StructType(new StructField[] 
            new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
            new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
        );
        Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

        UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() 
        
            private static final long serialVersionUID = -5239951370238629896L;

            @Override
            public Boolean call(String t1, String t2) throws Exception 
                return t1.contains(t2);
            
        ;
        spark.udf().register("contains", contains, DataTypes.BooleanType);

        UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, 
        String, String, String>() 
            private static final long serialVersionUID = -2882956931420910207L;

        @Override
        public String call(String t1, String t2, String t3) throws Exception 
            return t1.replaceAll(t2, t3);
        
    ;

    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);

Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
                                       .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
                                       .select(col("sentence_replaced"));

joined.show(false);

`

Input
    Allen jeevi pramod Allen Armstrong
    sandesh Armstrong jeevi
    harsha Nischay DeWALT

Expected Output
    Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

Actual Output
    Apex Tool Group jeevi pramod Apex Tool Group Armstrong
    Allen jeevi pramod Allen Columbus McKinnon
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

方法 1 的问题,如果输入数据集中有多个同义词键,则会创建很多行,如上面的示例输出所示。 预计只有一行全部替换

方法 2。 使用 ImmutableMap 和替换函数:这里我们在 ImmutableMap 函数中的 hashmap 中保留键和值对,我们调用替换函数来替换所有的东西 但是如果一行包含多个键,那么它会忽略整个行而不替换单个键...

try 

        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder()
                .appName("JavaTokenizerExample").getOrCreate();

        HashMap<String, String> options = new HashMap<String, String>();
        options.put("header", "true");
        Dataset<Row> dataFileContent = sqlContext.load("com.databricks.spark.csv", options);
        dataFileContent=dataFileContent.withColumn("ManufacturerSource", regexp_replace(col("ManufacturerSource"),"[^a-zA-Z0-9\\s+]",""));
        dataFileContent= dataFileContent.na().replace("ManufacturerSource",ImmutableMap.<String, String>builder()
            .put("Allen", "Apex Tool Group"),
            .put("Armstrong","Columbus McKinnon"),
            .put("DeWALT","StanleyBlack")
            //Here we have 40000 entries
            .build()

          );
          dataFileContent.show(10,false);

     catch (Exception e) 
        e.printStackTrace();
    

这里是示例代码和输出:

Input
    Allen jeevi pramod Allen Armstrong
    sandesh Armstrong jeevi
    harsha Nischay DeWALT

Expected Output
    Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

Actual Output
    Allen jeevi pramod Allen Armstrong
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

方法 3

在 UDF 中使用全部替换

public static void main(String[] args) 
          JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("JoinFunctions").setMaster("local[*]"));
          SQLContext sqlContext = new SQLContext(sc);
          SparkSession spark = SparkSession.builder().appName("StringSimiliarityExample").getOrCreate();


            Dataset<Row> sourceFileContent = sqlContext.read()
                        .format("com.databricks.spark.csv")
                        .option("header", "true")
                        .load("source100.csv");
            sourceFileContent.show(false);

        StructType schema = new StructType(new StructField[] 
        new StructField("label", DataTypes.IntegerType, false,
                Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false,
                Metadata.empty()) );
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
        UDF1 mode = new UDF1<String, String>() 
            public String call(final String types) throws Exception 
                return types.replaceAll("Allen", "Apex Tool Group")
                .replaceAll("Armstrong","Columbus McKinnon")
                .replaceAll("DeWALT","StanleyBlack")
                //40000 more entries.....
            
        ;

        sqlContext.udf().register("mode", mode, DataTypes.StringType);

        sentenceDataFrame.createOrReplaceTempView("people");
        Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
        newDF.show(false);
    

输出 *** 异常。

在这里,我们得到了 *** 异常。因为它类似于递归函数调用。

如果有任何其他创新方法可以帮助解决此问题,请告诉我们。

【问题讨论】:

【参考方案1】:

两者都不起作用,因为您总是遇到子字符串匹配的问题。例如:

ABC -> 德文 ABCDE -> ABC

带有文本“ABCDEF HIJ KLM”的输出是什么?它应该与输入相同,但您的方法最多会输出“DEDEF HIJ KLM”,最坏的情况是您将进行双重替换并获得“DEF HIJ KLM”。两种情况都不正确。

您可以通过为替换添加边界来改进这一点,也许使用正则表达式。然而,更好的方法是首先正确标记您的输入,应用标记替换(可以完全匹配),然后取消标记回原始格式。这可能就像按空间分割一样简单,但是您应该适当地说明可能存在哪些令牌边界。 (停止,连字符等)。

【讨论】:

您提出的某些方面是正确的。就像双重替换,但我们已经想到了几种方法。 1.所有子串匹配保持在前2.键前后添加空格。但这是面临的核心问题。问题是当我们拥有大型数据集时。 如果你标记它永远不会成为问题 tokenization 并不能解决问题,我们这里面临的核心问题是同义词列表的记录数量应该用于替换,对于少数同义词列表(键值对)如400 条记录它适用于更大的集合,比如 40000 条它失败了。

以上是关于使用 Apache spark java 搜索替换的主要内容,如果未能解决你的问题,请参考以下文章

Apache POI Excel - 搜索和替换文本

如何使用scala在Apache spark中用空字符串(“”)替换空值[重复]

通过 C# 的 Apache Spark 查询 [关闭]

使用Spark进行搜狗日志分析实例——列出搜索不同关键词超过10个的用户及其搜索的关键词

使用java开发spark的wordcount程序

使用Spark进行搜狗日志分析实例——统计每个小时的搜索量