spark.sql.columnNameOfCorruptRecord 的默认值是多少?

Posted

技术标签:

【中文标题】spark.sql.columnNameOfCorruptRecord 的默认值是多少?【英文标题】:What is the default value for spark.sql.columnNameOfCorruptRecord? 【发布时间】:2021-01-19 06:52:06 【问题描述】:

我已阅读documentation,但即使使用谷歌搜索也无法获得spark.sql.columnNameOfCorruptRecord 默认值。 第二个问题——PERMISSIVEmodespark.sql.columnNameOfCorruptRecord 为空或null 时如何工作?

【问题讨论】:

【参考方案1】:

According to the code (19/01/2021) 这是_corrupt_record:

val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
  .doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
  "to parse.")
  .version("1.2.0")
  .stringConf
  .createWithDefault("_corrupt_record")

关于PERMISSIVE模式的工作原理,你可以在FailSafeParser[T]看到这个:

def parse(input: IN): Iterator[InternalRow] = 
  try 
    rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
   catch 
    case e: BadRecordException => mode match 
      case PermissiveMode =>
        Iterator(toResultRow(e.partialResult(), e.record))
      case DropMalformedMode =>
        Iterator.empty
      case FailFastMode =>
        throw new SparkException("Malformed records are detected in record parsing. " +
        s"Parse Mode: $FailFastMode.name. To process malformed records as null " +
        "result, try setting the option 'mode' as 'PERMISSIVE'.", e)
  


private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = 
  if (corruptFieldIndex.isDefined) 
    (row, badRecord) => 
      var i = 0
      while (i < actualSchema.length) 
        val from = actualSchema(i)
        resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
        i += 1
      
      resultRow(corruptFieldIndex.get) = badRecord()
      resultRow
    
   else 
  (row, _) => row.getOrElse(nullResult)
  

如果未指定,它将回退到配置中定义的默认值。

【讨论】:

以上是关于spark.sql.columnNameOfCorruptRecord 的默认值是多少?的主要内容,如果未能解决你的问题,请参考以下文章