如何使用 sparklyr 为 Apache Spark 实现 Stanford CoreNLP 包装器?

Posted

技术标签:

【中文标题】如何使用 sparklyr 为 Apache Spark 实现 Stanford CoreNLP 包装器?【英文标题】:How to implement Stanford CoreNLP wrapper for Apache Spark using sparklyr? 【发布时间】:2017-02-25 03:56:20 【问题描述】:

我正在尝试创建一个 R 包,以便可以使用来自 R 的用于 Apache Spark 的 Stanford CoreNLP 包装器(由 databricks 提供)。我正在使用 sparklyr 包连接到我的本地 Spark 实例。我创建了一个具有以下依赖函数的包

spark_dependencies <- function(spark_version, scala_version, ...) 
  sparklyr::spark_dependency(
    jars = c(
      system.file(
        sprintf("stanford-corenlp-full/stanford-corenlp-3.6.0.jar"),
        package = "sparkNLP"
      ),
      system.file(
        sprintf("stanford-corenlp-full/stanford-corenlp-3.6.0-models.jar"),
        package = "sparkNLP"
      ),
      system.file(
        sprintf("stanford-corenlp-full/stanford-english-corenlp-2016-01-10-models.jar"),
        package = "sparkNLP"
      )
    ),
    packages = c(sprintf("databricks:spark-corenlp:0.2.0-s_%s", scala_version))
  )

在日志中,我可以看到两个数据块包都加载了依赖的 jar。我将所有 coreNLP 提取到 stanford-corenlp-full 文件夹,因此所有依赖项都应该正确加载。

Ivy Default Cache set to: /Users/Bob/.ivy2/cache
The jars for the packages stored in: /Users/Bob/.ivy2/jars
:: loading settings :: url = jar:file:/Users/Bob/Library/Caches/spark/spark-2.0.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-csv_2.11 added as a dependency
com.amazonaws#aws-java-sdk-pom added as a dependency
databricks#spark-corenlp added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found com.databricks#spark-csv_2.11;1.3.0 in central
    found org.apache.commons#commons-csv;1.1 in central
    found com.univocity#univocity-parsers;1.5.1 in central
    found com.amazonaws#aws-java-sdk-pom;1.10.34 in central
    found databricks#spark-corenlp;0.2.0-s_2.11 in spark-packages
    found edu.stanford.nlp#stanford-corenlp;3.6.0 in central
    found com.io7m.xom#xom;1.2.10 in central
    found xml-apis#xml-apis;1.3.03 in central
    found xerces#xercesImpl;2.8.0 in central
    found xalan#xalan;2.7.0 in central
    found joda-time#joda-time;2.9 in central
    found de.jollyday#jollyday;0.4.7 in central
    found javax.xml.bind#jaxb-api;2.2.7 in central
    found com.googlecode.efficient-java-matrix-library#ejml;0.23 in central
    found javax.json#javax.json-api;1.0 in central
    found org.slf4j#slf4j-api;1.7.12 in central
    found com.google.protobuf#protobuf-java;2.6.1 in central
:: resolution report :: resolve 625ms :: artifacts dl 28ms
    :: modules in use:
    com.amazonaws#aws-java-sdk-pom;1.10.34 from central in [default]
    com.databricks#spark-csv_2.11;1.3.0 from central in [default]
    com.google.protobuf#protobuf-java;2.6.1 from central in [default]
    com.googlecode.efficient-java-matrix-library#ejml;0.23 from central in [default]
    com.io7m.xom#xom;1.2.10 from central in [default]
    com.univocity#univocity-parsers;1.5.1 from central in [default]
    databricks#spark-corenlp;0.2.0-s_2.11 from spark-packages in [default]
    de.jollyday#jollyday;0.4.7 from central in [default]
    edu.stanford.nlp#stanford-corenlp;3.6.0 from central in [default]
    javax.json#javax.json-api;1.0 from central in [default]
    javax.xml.bind#jaxb-api;2.2.7 from central in [default]
    joda-time#joda-time;2.9 from central in [default]
    org.apache.commons#commons-csv;1.1 from central in [default]
    org.slf4j#slf4j-api;1.7.12 from central in [default]
    xalan#xalan;2.7.0 from central in [default]
    xerces#xercesImpl;2.8.0 from central in [default]
    xml-apis#xml-apis;1.3.03 from central in [default]
    :: evicted modules:
    xml-apis#xml-apis;2.0.2 by [xml-apis#xml-apis;1.3.03] in [default]
    joda-time#joda-time;2.1 by [joda-time#joda-time;2.9] in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   19  |   0   |   0   |   2   ||   16  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 16 already retrieved (0kB/17ms)
16/10/16 00:08:15 INFO SparkContext: Running Spark version 2.0.0
16/10/16 00:08:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/16 00:08:15 INFO SecurityManager: Changing view acls to: Bob
16/10/16 00:08:15 INFO SecurityManager: Changing modify acls to: Bob
16/10/16 00:08:15 INFO SecurityManager: Changing view acls groups to: 
16/10/16 00:08:15 INFO SecurityManager: Changing modify acls groups to: 
16/10/16 00:08:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Bob); groups with view permissions: Set(); users  with modify permissions: Set(Bob); groups with modify permissions: Set()
16/10/16 00:08:15 INFO Utils: Successfully started service 'sparkDriver' on port 54829.
16/10/16 00:08:15 INFO SparkEnv: Registering MapOutputTracker
16/10/16 00:08:15 INFO SparkEnv: Registering BlockManagerMaster
16/10/16 00:08:15 INFO DiskBlockManager: Created local directory at /private/var/folders/hs/yw76yd_95lscwclwg15n73tw0000gn/T/blockmgr-8df2c32b-19cb-4fdf-b321-85cb411b564a
16/10/16 00:08:15 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
16/10/16 00:08:15 INFO SparkEnv: Registering OutputCommitCoordinator
16/10/16 00:08:16 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/10/16 00:08:16 INFO SparkUI: Bound SparkUI to 127.0.0.1, and started at http://127.0.0.1:4040
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Library/Frameworks/R.framework/Versions/3.3/Resources/library/sparkNLP/stanford-corenlp-full/stanford-corenlp-3.6.0.jar at spark://127.0.0.1:54829/jars/stanford-corenlp-3.6.0.jar with timestamp 1476569296302
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Library/Frameworks/R.framework/Versions/3.3/Resources/library/sparkNLP/stanford-corenlp-full/stanford-corenlp-3.6.0-models.jar at spark://127.0.0.1:54829/jars/stanford-corenlp-3.6.0-models.jar with timestamp 1476569296303
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Library/Frameworks/R.framework/Versions/3.3/Resources/library/sparkNLP/stanford-corenlp-full/stanford-english-corenlp-2016-01-10-models.jar at spark://127.0.0.1:54829/jars/stanford-english-corenlp-2016-01-10-models.jar with timestamp 1476569296303
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/com.databricks_spark-csv_2.11-1.3.0.jar at spark://127.0.0.1:54829/jars/com.databricks_spark-csv_2.11-1.3.0.jar with timestamp 1476569296303
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/databricks_spark-corenlp-0.2.0-s_2.11.jar at spark://127.0.0.1:54829/jars/databricks_spark-corenlp-0.2.0-s_2.11.jar with timestamp 1476569296304
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar at spark://127.0.0.1:54829/jars/org.apache.commons_commons-csv-1.1.jar with timestamp 1476569296304
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar at spark://127.0.0.1:54829/jars/com.univocity_univocity-parsers-1.5.1.jar with timestamp 1476569296304
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/edu.stanford.nlp_stanford-corenlp-3.6.0.jar at spark://127.0.0.1:54829/jars/edu.stanford.nlp_stanford-corenlp-3.6.0.jar with timestamp 1476569296304
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/com.google.protobuf_protobuf-java-2.6.1.jar at spark://127.0.0.1:54829/jars/com.google.protobuf_protobuf-java-2.6.1.jar with timestamp 1476569296304
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/com.io7m.xom_xom-1.2.10.jar at spark://127.0.0.1:54829/jars/com.io7m.xom_xom-1.2.10.jar with timestamp 1476569296305
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/joda-time_joda-time-2.9.jar at spark://127.0.0.1:54829/jars/joda-time_joda-time-2.9.jar with timestamp 1476569296305
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/de.jollyday_jollyday-0.4.7.jar at spark://127.0.0.1:54829/jars/de.jollyday_jollyday-0.4.7.jar with timestamp 1476569296305
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/com.googlecode.efficient-java-matrix-library_ejml-0.23.jar at spark://127.0.0.1:54829/jars/com.googlecode.efficient-java-matrix-library_ejml-0.23.jar with timestamp 1476569296305
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/javax.json_javax.json-api-1.0.jar at spark://127.0.0.1:54829/jars/javax.json_javax.json-api-1.0.jar with timestamp 1476569296305
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/org.slf4j_slf4j-api-1.7.12.jar at spark://127.0.0.1:54829/jars/org.slf4j_slf4j-api-1.7.12.jar with timestamp 1476569296306
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/xml-apis_xml-apis-1.3.03.jar at spark://127.0.0.1:54829/jars/xml-apis_xml-apis-1.3.03.jar with timestamp 1476569296306
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/xerces_xercesImpl-2.8.0.jar at spark://127.0.0.1:54829/jars/xerces_xercesImpl-2.8.0.jar with timestamp 1476569296306
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/xalan_xalan-2.7.0.jar at spark://127.0.0.1:54829/jars/xalan_xalan-2.7.0.jar with timestamp 1476569296306
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Users/Bob/.ivy2/jars/javax.xml.bind_jaxb-api-2.2.7.jar at spark://127.0.0.1:54829/jars/javax.xml.bind_jaxb-api-2.2.7.jar with timestamp 1476569296306
16/10/16 00:08:16 INFO SparkContext: Added JAR file:/Library/Frameworks/R.framework/Versions/3.3/Resources/library/sparklyr/java/sparklyr-2.0-2.11.jar at spark://127.0.0.1:54829/jars/sparklyr-2.0-2.11.jar with timestamp 1476569296307
16/10/16 00:08:16 INFO Executor: Starting executor ID driver on host localhost
16/10/16 00:08:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54830.
16/10/16 00:08:16 INFO NettyBlockTransferService: Server created on 127.0.0.1:54830
16/10/16 00:08:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 127.0.0.1, 54830)
16/10/16 00:08:16 INFO BlockManagerMasterEndpoint: Registering block manager 127.0.0.1:54830 with 366.3 MB RAM, BlockManagerId(driver, 127.0.0.1, 54830)
16/10/16 00:08:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 127.0.0.1, 54830)
16/10/16 00:08:16 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
16/10/16 00:08:17 INFO HiveSharedState: Warehouse path is 'file:/Users/Bob/Documents/RPROJECTS/sparkNLP/spark-warehouse'

.

所以我应该可以调用 databricks sparkLib 函数(位于com.databricks.spark.corenlp.functions)

但是我调用时似乎找不到函数类

library(sparkNLP) #the library I created
library(sparklyr)
sc <- spark_connect(master = "local")
invoke_new(sc,"com.databricks.spark.corenlp.functions")

我收到一条错误消息说明

Error: failed to invoke spark command
16/10/16 00:12:11 WARN cannot find matching constructor for class com.databricks.spark.corenlp.functions. Candidates are:
16/10/16 00:12:11 ERROR <init> on com.databricks.spark.corenlp.functions failed

我不确定是没有正确加载依赖项还是存在其他问题。

任何帮助将不胜感激。

下面是我来自 RStudio 的sessionInfo

R version 3.3.1 (2016-06-21)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: OS X 10.11.4 (El Capitan)

locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] sparklyr_0.4   sparkNLP_0.1.0

loaded via a namespace (and not attached):
 [1] Rcpp_0.12.7     digest_0.6.10   dplyr_0.5.0     withr_1.0.2     rprojroot_1.0-2
 [6] assertthat_0.1  rappdirs_0.3.1  R6_2.2.0        DBI_0.5-1       magrittr_1.5   
[11] config_0.2      tools_3.3.1     readr_1.0.0     yaml_2.1.13     parallel_3.3.1 
[16] tibble_1.2     

【问题讨论】:

【参考方案1】:

com.databricks.spark.corenlp.functionsis an object,不是类,所以调用没有意义。这基本上就是错误消息所说的:

错误:java.lang.Exception:没有为类 com.databricks.spark.corenlp.functions 找到匹配的构造函数

相反,您应该使用invoke_static 访问定义的函数,例如:

invoke_static(sc,"com.databricks.spark.corenlp.functions", "cleanxml")
<jobj[15]>
org.apache.spark.sql.expressions.UserDefinedFunction
UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

从the official README借来的示例数据

df <- copy_to(sc, tibble(
  id = 1,
  text = "<xml>Stanford University is located in California. It is a great university.</xml>"
))

你可以像这样定义一个包装器:

sdf_cleanxml <- function(df, input_col, output_col) 
  sc <- df$src$con
  clean_xml <- invoke_static(sc,"com.databricks.spark.corenlp.functions", "cleanxml")
  arg <- list(invoke_static(sc, "org.apache.spark.sql.functions", "col", input_col))
  expr <- invoke(clean_xml, "apply", arg)
  df %>%
    spark_dataframe() %>% 
    invoke("withColumn", output_col, expr) %>%
    sdf_register()

并按如下方式调用它:

sdf_cleanxml(df, "text", "text_clean")
# Source: spark<?> [?? x 3]
    id text                                 text_clean                         
  <dbl> <chr>                                <chr>                              
1     1 <xml>Stanford University is located… Stanford University is located in …

在实践中,尽管只注册所需的函数可能更简单:

register_core_nlp <- function(sc) 
  funs <- c(
    "cleanxml", "tokenize", "ssplit", "pos", "lemma", "ner", "depparse",
    "coref", "natlog", "openie", "sentiment"
  )
  udf_registration <- sparklyr::invoke(sparklyr::spark_session(sc), "udf")
  for (fun in funs) 
    sparklyr::invoke(
      udf_registration, "register", fun,
      sparklyr::invoke_static(sc,"com.databricks.spark.corenlp.functions", fun)
    )
   


register_core_nlp(sc)

让 SQL 翻译完成剩下的工作:

df %>% 
  transmute(doc = cleanxml(text)) %>%
  transmute(sen = explode(ssplit(doc))) %>%
  mutate(words = tokenize(sen), ner_tags = ner(sen), sentiment = sentiment(sen))
# Source: spark<?> [?? x 4]
  sen                                            words      ner_tags   sentiment
  <chr>                                          <list>     <list>         <int>
1 Stanford University is located in California . <list [7]> <list [7]>         1
2 It is a great university .                     <list [6]> <list [6]>         4

【讨论】:

以上是关于如何使用 sparklyr 为 Apache Spark 实现 Stanford CoreNLP 包装器?的主要内容,如果未能解决你的问题,请参考以下文章

Sparklyr - 小数精度 8 超过最大精度 7

如何使用 'sparklyr::replace.na()' 替换一列上的 NaN?

如何使用 sparklyr 过滤部分匹配

Sparklyr:如何将列表列分解为Spark表中自己的列?

为啥 R lubridate::duration 数据类型使用 sparklyr 转换为 spark 中的字符?

R - 如何使用 sparklyr 复制火花数据框中的行