在 Spark 上使用 Scala 在 Dataframe 中拆分字符串

Posted

技术标签:

【中文标题】在 Spark 上使用 Scala 在 Dataframe 中拆分字符串【英文标题】:Split string in Dataframe using Scala on Spark 【发布时间】:2019-03-04 05:08:38 【问题描述】:

我有一个包含 100 多列的日志文件。其中我只需要两列'_raw'和'_time',所以我创建了将日志文件加载为“csv”DF。

第 1 步:

scala> val log = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("soa_prod_diag_10_jan.csv")
log: org.apache.spark.sql.DataFrame = [ARRAffinity: string, CoordinatorNonSecureURL: string ... 126 more fields]

第 2 步: 我将 DF 注册为临时表 log.createOrReplaceTempView("logs")

第 3 步:我提取了两个必需的列“_raw”和“_time”

scala> val sqlDF = spark.sql("select _raw, _time from logs")
sqlDF: org.apache.spark.sql.DataFrame = [_raw: string, _time: string]

scala> sqlDF.show(1, false)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|_raw                                                                                                                                                                                                                                                                                                                                                                                                |_time|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [tid: [ACTIVE].ExecuteThread: '28' for queue: 'weblogic.kernel.Default (self-tuning)'] [userId: <anonymous>] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b,0] [partition-name: DOMAIN] [tenant-name: GLOBAL] Aggregation Server Not Available. Failed to get remote aggregator[[|null |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
only showing top 1 row

我的要求:

我需要拆分 '_raw' 列中的字符串以生成 [2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [错误] [OSB-473003] [oracle.osb.statistics.statistics] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b]列名分别为 a, b, c, d, e, f

同时从“_raw”和“_time”中删除所有空值

您的回答将不胜感激:)

【问题讨论】:

请添加一个完整的日志字符串,并注明您需要什么作为_time。 来自 _raw 列中的以下字符串 [2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b] A 列 [2019-01-10T23:59:59.998-06:00] B 列 [xx_yyy_zz_sss_ra10] C 列 [错误] D 列 [OSB-473003] 列E [oracle.osb.statistics.statistics] F 列 [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b] 【参考方案1】:

你可以分割函数,并用空格分割 _raw。这将返回一个数组,然后您可以从该数组中提取值。您还可以使用 regexp_extract 函数从日志消息中提取值。两种方式如下所示。希望对您有所帮助。

//Creating Test Data
val df = Seq("[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [tid: [ACTIVE].ExecuteThread: '28' for queue: 'weblogic.kernel.Default (self-tuning)'] [userId: <anonymous>] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b,0] [partition-name: DOMAIN] [tenant-name: GLOBAL] Aggregation Server Not Available. Failed to get remote aggregator[[")
  .toDF("_raw")

val splitDF = df.withColumn("split_raw_arr", split($"_raw", " "))
  .withColumn("A", $"split_raw_arr"(0))
  .withColumn("B", $"split_raw_arr"(1))
  .withColumn("C", $"split_raw_arr"(2))
  .withColumn("D", $"split_raw_arr"(3))
  .withColumn("E", $"split_raw_arr"(4))
  .drop("_raw", "split_raw_arr")

splitDF.show(false)

+-------------------------------+--------------------+-------+------------+----------------------------------+
|A                              |B                   |C      |D           |E                                 |
+-------------------------------+--------------------+-------+------------+----------------------------------+
|[2019-01-10T23:59:59.998-06:00]|[xx_yyy_zz_sss_ra10]|[ERROR]|[OSB-473003]|[oracle.osb.statistics.statistics]|
+-------------------------------+--------------------+-------+------------+----------------------------------+

val extractedDF = df
  .withColumn("a", regexp_extract($"_raw", "\\[(.*?)\\]",1))
  .withColumn("b", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\]",2))
  .withColumn("c", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",3))
  .withColumn("d", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",4))
  .withColumn("e", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",5))
  .withColumn("f", regexp_extract($"_raw", "(?<=ecid: )(.*?)(?=,)",1))
  .drop("_raw")

+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
|a                            |b                 |c    |d         |e                               |f                                            |
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
|2019-01-10T23:59:59.998-06:00|xx_yyy_zz_sss_ra10|ERROR|OSB-473003|oracle.osb.statistics.statistics|92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b|
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+

【讨论】:

非常感谢...再次提问;;; [2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [错误] [OSB-473003] [oracle.osb.statistics.statistics] ,0:1:5:3:6:4:6: 4]”。如果我从上面的字符串中提取了 ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b 那么我需要使用什么索引将其放入“F”列中 为此,我希望您采用第二种方法,因为数据中有多个空格。 对于 f 列,如果我只需要 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b 我将如何处理? 这似乎是一个字母数字值,但我将从 "ecid:" 到 "," 中的所有内容,因为我不确定所有值是否相同。我已经编辑了答案。

以上是关于在 Spark 上使用 Scala 在 Dataframe 中拆分字符串的主要内容,如果未能解决你的问题,请参考以下文章

Spark:在scala中的数据帧上使用动态过滤器进行聚合

在 Spark 作业服务器上调用程序的 Scala 运行时错误

如何在 EMR 上使用 Spark 3 为 Scala 对象解决“加载类失败”

Scala和Spark的大数据分析

无法在 Spark-2.2.0 - Scala-2.11.8 上运行单元测试(scalatest)

在linux上如何配置spark环境,在linux上安装scala和spark老是失败