spark.sql.columnNameOfCorruptRecord 的默认值是多少?
Posted
技术标签:
【中文标题】spark.sql.columnNameOfCorruptRecord 的默认值是多少?【英文标题】:What is the default value for spark.sql.columnNameOfCorruptRecord? 【发布时间】:2021-01-19 06:52:06 【问题描述】:我已阅读documentation,但即使使用谷歌搜索也无法获得spark.sql.columnNameOfCorruptRecord
默认值。
第二个问题——PERMISSIVE
mode
在spark.sql.columnNameOfCorruptRecord
为空或null 时如何工作?
【问题讨论】:
【参考方案1】:According to the code (19/01/2021) 这是_corrupt_record
:
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
"to parse.")
.version("1.2.0")
.stringConf
.createWithDefault("_corrupt_record")
关于PERMISSIVE
模式的工作原理,你可以在FailSafeParser[T]
看到这个:
def parse(input: IN): Iterator[InternalRow] =
try
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
catch
case e: BadRecordException => mode match
case PermissiveMode =>
Iterator(toResultRow(e.partialResult(), e.record))
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
throw new SparkException("Malformed records are detected in record parsing. " +
s"Parse Mode: $FailFastMode.name. To process malformed records as null " +
"result, try setting the option 'mode' as 'PERMISSIVE'.", e)
private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow =
if (corruptFieldIndex.isDefined)
(row, badRecord) =>
var i = 0
while (i < actualSchema.length)
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
i += 1
resultRow(corruptFieldIndex.get) = badRecord()
resultRow
else
(row, _) => row.getOrElse(nullResult)
如果未指定,它将回退到配置中定义的默认值。
【讨论】:
以上是关于spark.sql.columnNameOfCorruptRecord 的默认值是多少?的主要内容,如果未能解决你的问题,请参考以下文章